http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/scalar-fn-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index 59aac6d..73b497d 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -30,7 +30,7 @@
 #include "codegen/codegen-anyval.h"
 #include "codegen/llvm-codegen.h"
 #include "exprs/anyval-util.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/lib-cache.h"
 #include "runtime/runtime-state.h"
@@ -57,7 +57,7 @@ using std::pair;
 #define MAX_INTERP_ARGS 20
 
 ScalarFnCall::ScalarFnCall(const TExprNode& node)
-  : Expr(node),
+  : ScalarExpr(node),
     vararg_start_idx_(node.__isset.vararg_start_idx ? node.vararg_start_idx : 
-1),
     scalar_fn_wrapper_(NULL),
     prepare_fn_(NULL),
@@ -78,9 +78,9 @@ Status ScalarFnCall::LoadPrepareAndCloseFn(LlvmCodeGen* 
codegen) {
   return Status::OK();
 }
 
-Status ScalarFnCall::Prepare(RuntimeState* state, const RowDescriptor& desc,
-    ExprContext* context) {
-  RETURN_IF_ERROR(Expr::Prepare(state, desc, context));
+Status ScalarFnCall::Init(const RowDescriptor& desc, RuntimeState* state) {
+  // Initialize children first.
+  RETURN_IF_ERROR(ScalarExpr::Init(desc, state));
 
   if (fn_.scalar_fn.symbol.empty()) {
     // This path is intended to only be used during development to test FE
@@ -94,17 +94,11 @@ Status ScalarFnCall::Prepare(RuntimeState* state, const 
RowDescriptor& desc,
   }
 
   // Check if the function takes CHAR as input or returns CHAR.
-  FunctionContext::TypeDesc return_type = 
AnyValUtil::ColumnTypeToTypeDesc(type_);
-  vector<FunctionContext::TypeDesc> arg_types;
   bool has_char_arg_or_result = type_.type == TYPE_CHAR;
-  for (int i = 0; i < children_.size(); ++i) {
-    arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(children_[i]->type_));
+  for (int i = 0; !has_char_arg_or_result && i < children_.size(); ++i) {
     has_char_arg_or_result |= children_[i]->type_.type == TYPE_CHAR;
   }
 
-  fn_context_index_ =
-      context->Register(state, return_type, arg_types, 
ComputeVarArgsBufferSize());
-
   // Use the interpreted path and call the builtin without codegen if any of 
the
   // followings is true:
   // 1. codegen is disabled by query option
@@ -155,50 +149,63 @@ Status ScalarFnCall::Prepare(RuntimeState* state, const 
RowDescriptor& desc,
     state->AddScalarFnToCodegen(this);
   }
 
-  // For IR UDF, the loading of the Prepare() and Close() functions is 
deferred until
+  // For IR UDF, the loading of the Init() and CloseContext() functions is 
deferred until
   // first time GetCodegendComputeFn() is invoked.
   if (!is_ir_udf) RETURN_IF_ERROR(LoadPrepareAndCloseFn(NULL));
   return Status::OK();
 }
 
-Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx,
-    FunctionContext::FunctionStateScope scope) {
+Status ScalarFnCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
+    RuntimeState* state, ScalarExprEvaluator* eval) const {
   // Opens and inits children
-  RETURN_IF_ERROR(Expr::Open(state, ctx, scope));
-  FunctionContext* fn_ctx = ctx->fn_context(fn_context_index_);
+  RETURN_IF_ERROR(ScalarExpr::OpenEvaluator(scope, state, eval));
+  DCHECK_GE(fn_ctx_idx_, 0);
+  FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
+  bool is_interpreted = scalar_fn_wrapper_ == nullptr;
 
-  if (scalar_fn_wrapper_ == NULL) {
+  if (is_interpreted) {
     // We're in the interpreted path (i.e. no JIT). Populate our 
FunctionContext's
     // staging_input_vals, which will be reused across calls to scalar_fn_.
-    DCHECK(scalar_fn_ != NULL);
+    DCHECK(scalar_fn_ != nullptr);
     vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals();
     for (int i = 0; i < NumFixedArgs(); ++i) {
       AnyVal* input_val;
-      RETURN_IF_ERROR(AllocateAnyVal(state, ctx->pool_.get(), 
children_[i]->type(),
+      RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), 
children_[i]->type(),
           "Could not allocate expression value", &input_val));
       input_vals->push_back(input_val);
     }
   }
 
   // Only evaluate constant arguments at the top level of function contexts.
-  // If 'ctx' was cloned, the constant values were copied from the parent.
+  // If 'eval' was cloned, the constant values were copied from the parent.
   if (scope == FunctionContext::FRAGMENT_LOCAL) {
     vector<AnyVal*> constant_args;
-    for (int i = 0; i < children_.size(); ++i) {
+    for (const ScalarExpr* child : children()) {
       AnyVal* const_val;
-      RETURN_IF_ERROR(children_[i]->GetConstVal(state, ctx, &const_val));
+      RETURN_IF_ERROR(eval->GetConstValue(state, *child, &const_val));
       constant_args.push_back(const_val);
     }
     fn_ctx->impl()->SetConstantArgs(move(constant_args));
+
+    // If we're calling MathFunctions::RoundUpTo(), we need to set 
output_scale_
+    // which determines how many decimal places are printed.
+    // TODO: Move this to Expr initialization.
+    if (this == &eval->root()) {
+      if (fn_.name.function_name == "round" && type_.type == TYPE_DOUBLE) {
+        DCHECK_EQ(children_.size(), 2);
+        IntVal* scale_arg = reinterpret_cast<IntVal*>(constant_args[1]);
+        if (scale_arg != nullptr) eval->output_scale_ = scale_arg->val;
+      }
+    }
   }
 
-  if (scalar_fn_wrapper_ == NULL) {
+  if (is_interpreted) {
     // Now we have the constant values, cache them so that the interpreted 
path can
     // call the UDF without reevaluating the arguments. 'staging_input_vals' 
and
     // 'varargs_buffer' in the FunctionContext are used to pass fixed and 
variable-length
     // arguments respectively. 'non_constant_args()' in the FunctionContext 
will contain
     // pointers to the remaining (non-constant) children that are evaluated 
for every row.
-    vector<pair<Expr*, AnyVal*>> non_constant_args;
+    vector<pair<ScalarExpr*, AnyVal*>> non_constant_args;
     uint8_t* varargs_buffer = fn_ctx->impl()->varargs_buffer();
     for (int i = 0; i < children_.size(); ++i) {
       AnyVal* input_arg;
@@ -222,7 +229,7 @@ Status ScalarFnCall::Open(RuntimeState* state, ExprContext* 
ctx,
       // be able to trust is_constant() and switch back to that.
       if (children_[i]->IsLiteral()) {
         const AnyVal* constant_arg = fn_ctx->impl()->constant_args()[i];
-        DCHECK(constant_arg != NULL);
+        DCHECK(constant_arg != nullptr);
         memcpy(input_arg, constant_arg, arg_bytes);
       } else {
         non_constant_args.emplace_back(children_[i], input_arg);
@@ -231,7 +238,7 @@ Status ScalarFnCall::Open(RuntimeState* state, ExprContext* 
ctx,
     fn_ctx->impl()->SetNonConstantArgs(move(non_constant_args));
   }
 
-  if (prepare_fn_ != NULL) {
+  if (prepare_fn_ != nullptr) {
     if (scope == FunctionContext::FRAGMENT_LOCAL) {
       prepare_fn_(fn_ctx, FunctionContext::FRAGMENT_LOCAL);
       if (fn_ctx->has_error()) return Status(fn_ctx->error_msg());
@@ -240,31 +247,20 @@ Status ScalarFnCall::Open(RuntimeState* state, 
ExprContext* ctx,
     if (fn_ctx->has_error()) return Status(fn_ctx->error_msg());
   }
 
-  // If we're calling MathFunctions::RoundUpTo(), we need to set 
output_scale_, which
-  // determines how many decimal places are printed.
-  // TODO: revisit this. We should be able to do this if the scale argument is
-  // non-constant.
-  if (fn_.name.function_name == "round" && type_.type == TYPE_DOUBLE) {
-    DCHECK_EQ(children_.size(), 2);
-    if (children_[1]->is_constant()) {
-      IntVal scale_arg = children_[1]->GetIntVal(ctx, NULL);
-      output_scale_ = scale_arg.val;
-    }
-  }
-
   return Status::OK();
 }
 
-void ScalarFnCall::Close(RuntimeState* state, ExprContext* context,
-                         FunctionContext::FunctionStateScope scope) {
-  if (fn_context_index_ != -1 && close_fn_ != NULL) {
-    FunctionContext* fn_ctx = context->fn_context(fn_context_index_);
+void ScalarFnCall::CloseEvaluator(FunctionContext::FunctionStateScope scope,
+    RuntimeState* state, ScalarExprEvaluator* eval) const {
+  DCHECK_GE(fn_ctx_idx_, 0);
+  if (close_fn_ != NULL) {
+    FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
     close_fn_(fn_ctx, FunctionContext::THREAD_LOCAL);
     if (scope == FunctionContext::FRAGMENT_LOCAL) {
       close_fn_(fn_ctx, FunctionContext::FRAGMENT_LOCAL);
     }
   }
-  Expr::Close(state, context, scope);
+  ScalarExpr::CloseEvaluator(scope, state, eval);
 }
 
 // Dynamically loads the pre-compiled UDF and codegens a function that calls 
each child's
@@ -336,8 +332,8 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* 
codegen, Function** fn) {
   fn_name << udf->getName().str() << "Wrapper";
 
   Value* args[2];
-  *fn = CreateIrFunctionPrototype(codegen, fn_name.str(), &args);
-  Value* expr_ctx = args[0];
+  *fn = CreateIrFunctionPrototype(fn_name.str(), codegen, &args);
+  Value* eval = args[0];
   Value* row = args[1];
   BasicBlock* block = BasicBlock::Create(codegen->context(), "entry", *fn);
   LlvmBuilder builder(block);
@@ -346,12 +342,11 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* 
codegen, Function** fn) {
   vector<Value*> udf_args;
 
   // First argument is always FunctionContext*.
-  // Index into our registered offset in the ExprContext.
-  Value* expr_ctx_gep = builder.CreateStructGEP(NULL, expr_ctx, 1, 
"expr_ctx_gep");
-  Value* fn_ctxs_base = builder.CreateLoad(expr_ctx_gep, "fn_ctxs_base");
+  // Index into our registered offset in the ScalarFnEvaluator.
+  Value* eval_gep = builder.CreateStructGEP(NULL, eval, 1, "eval_gep");
+  Value* fn_ctxs_base = builder.CreateLoad(eval_gep, "fn_ctxs_base");
   // Use GEP to add our index to the base pointer
-  Value* fn_ctx_ptr =
-      builder.CreateConstGEP1_32(fn_ctxs_base, fn_context_index_, 
"fn_ctx_ptr");
+  Value* fn_ctx_ptr = builder.CreateConstGEP1_32(fn_ctxs_base, fn_ctx_idx_, 
"fn_ctx_ptr");
   Value* fn_ctx = builder.CreateLoad(fn_ctx_ptr, "fn_ctx");
   udf_args.push_back(fn_ctx);
 
@@ -371,16 +366,17 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* 
codegen, Function** fn) {
   for (int i = 0; i < GetNumChildren(); ++i) {
     Function* child_fn = NULL;
     vector<Value*> child_fn_args;
-    // Set 'child_fn' to the codegen'd function, sets child_fn = NULL if 
codegen fails
-    children_[i]->GetCodegendComputeFn(codegen, &child_fn);
-    if (child_fn == NULL) {
+    // Set 'child_fn' to the codegen'd function, sets child_fn == NULL if 
codegen fails
+    Status status = children_[i]->GetCodegendComputeFn(codegen, &child_fn);
+    if (UNLIKELY(!status.ok())) {
+      DCHECK(child_fn == NULL);
       // Set 'child_fn' to the interpreted function
       child_fn = GetStaticGetValWrapper(children_[i]->type(), codegen);
       // First argument to interpreted function is children_[i]
-      Type* expr_ptr_type = codegen->GetPtrType(Expr::LLVM_CLASS_NAME);
+      Type* expr_ptr_type = codegen->GetPtrType(ScalarExpr::LLVM_CLASS_NAME);
       child_fn_args.push_back(codegen->CastPtrToLlvmPtr(expr_ptr_type, 
children_[i]));
     }
-    child_fn_args.push_back(expr_ctx);
+    child_fn_args.push_back(eval);
     child_fn_args.push_back(row);
 
     // Call 'child_fn', adding the result to either 'udf_args' or 
'varargs_buffer'
@@ -453,20 +449,21 @@ Status ScalarFnCall::GetFunction(LlvmCodeGen* codegen, 
const string& symbol, voi
 }
 
 void ScalarFnCall::EvaluateNonConstantChildren(
-    ExprContext* context, const TupleRow* row) {
-  FunctionContext* fn_ctx = context->fn_context(fn_context_index_);
-  for (pair<Expr*, AnyVal*> child : fn_ctx->impl()->non_constant_args()) {
-    void* val = context->GetValue(child.first, row);
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
+  FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
+  for (pair<ScalarExpr*, AnyVal*> child : fn_ctx->impl()->non_constant_args()) 
{
+    void* val = eval->GetValue(*(child.first), row);
     AnyValUtil::SetAnyVal(val, child.first->type(), child.second);
   }
 }
 
 template<typename RETURN_TYPE>
-RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* context, const TupleRow* 
row) {
+RETURN_TYPE ScalarFnCall::InterpretEval(ScalarExprEvaluator* eval,
+    const TupleRow* row) const {
   DCHECK(scalar_fn_ != NULL) << DebugString();
-  FunctionContext* fn_ctx = context->fn_context(fn_context_index_);
+  FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
   vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals();
-  EvaluateNonConstantChildren(context, row);
+  EvaluateNonConstantChildren(eval, row);
 
   if (vararg_start_idx_ == -1) {
     switch (children_.size()) {
@@ -480,7 +477,7 @@ RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* 
context, const TupleRow* ro
    // case X:
    //     typedef RETURN_TYPE (*ScalarFnX)(FunctionContext*, const AnyVal& a1, 
...,
    //         const AnyVal& aX);
-   //     return reinterpret_cast<ScalarFnn>(scalar_fn_)(fn_ctx, 
*(*input_vals)[0], ...,
+   //     return reinterpret_cast<ScalarFnX>(scalar_fn_)(fn_ctx, 
*(*input_vals)[0], ...,
    //         *(*input_vals)[X-1]);
 #define SCALAR_FN_TYPE(n) BOOST_PP_CAT(ScalarFn, n)
 #define INTERP_SCALAR_FN(z, n, unused)                                       \
@@ -524,102 +521,112 @@ RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* 
context, const TupleRow* ro
   return RETURN_TYPE::null();
 }
 
-typedef BooleanVal (*BooleanWrapper)(ExprContext*, const TupleRow*);
-typedef TinyIntVal (*TinyIntWrapper)(ExprContext*, const TupleRow*);
-typedef SmallIntVal (*SmallIntWrapper)(ExprContext*, const TupleRow*);
-typedef IntVal (*IntWrapper)(ExprContext*, const TupleRow*);
-typedef BigIntVal (*BigIntWrapper)(ExprContext*, const TupleRow*);
-typedef FloatVal (*FloatWrapper)(ExprContext*, const TupleRow*);
-typedef DoubleVal (*DoubleWrapper)(ExprContext*, const TupleRow*);
-typedef StringVal (*StringWrapper)(ExprContext*, const TupleRow*);
-typedef TimestampVal (*TimestampWrapper)(ExprContext*, const TupleRow*);
-typedef DecimalVal (*DecimalWrapper)(ExprContext*, const TupleRow*);
+typedef BooleanVal (*BooleanWrapper)(ScalarExprEvaluator*, const TupleRow*);
+typedef TinyIntVal (*TinyIntWrapper)(ScalarExprEvaluator*, const TupleRow*);
+typedef SmallIntVal (*SmallIntWrapper)(ScalarExprEvaluator*, const TupleRow*);
+typedef IntVal (*IntWrapper)(ScalarExprEvaluator*, const TupleRow*);
+typedef BigIntVal (*BigIntWrapper)(ScalarExprEvaluator*, const TupleRow*);
+typedef FloatVal (*FloatWrapper)(ScalarExprEvaluator*, const TupleRow*);
+typedef DoubleVal (*DoubleWrapper)(ScalarExprEvaluator*, const TupleRow*);
+typedef StringVal (*StringWrapper)(ScalarExprEvaluator*, const TupleRow*);
+typedef TimestampVal (*TimestampWrapper)(ScalarExprEvaluator*, const 
TupleRow*);
+typedef DecimalVal (*DecimalWrapper)(ScalarExprEvaluator*, const TupleRow*);
 
 // TODO: macroify this?
-BooleanVal ScalarFnCall::GetBooleanVal(ExprContext* context, const TupleRow* 
row) {
+BooleanVal ScalarFnCall::GetBooleanVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_BOOLEAN);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<BooleanVal>(context, 
row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<BooleanVal>(eval, row);
   BooleanWrapper fn = reinterpret_cast<BooleanWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-TinyIntVal ScalarFnCall::GetTinyIntVal(ExprContext* context, const TupleRow* 
row) {
+TinyIntVal ScalarFnCall::GetTinyIntVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_TINYINT);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<TinyIntVal>(context, 
row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<TinyIntVal>(eval, row);
   TinyIntWrapper fn = reinterpret_cast<TinyIntWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-SmallIntVal ScalarFnCall::GetSmallIntVal(ExprContext* context, const TupleRow* 
row) {
+SmallIntVal ScalarFnCall::GetSmallIntVal(
+     ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_SMALLINT);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<SmallIntVal>(context, 
row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<SmallIntVal>(eval, row);
   SmallIntWrapper fn = reinterpret_cast<SmallIntWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-IntVal ScalarFnCall::GetIntVal(ExprContext* context, const TupleRow* row) {
+IntVal ScalarFnCall::GetIntVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_INT);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<IntVal>(context, row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<IntVal>(eval, row);
   IntWrapper fn = reinterpret_cast<IntWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-BigIntVal ScalarFnCall::GetBigIntVal(ExprContext* context, const TupleRow* 
row) {
+BigIntVal ScalarFnCall::GetBigIntVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_BIGINT);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<BigIntVal>(context, 
row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<BigIntVal>(eval, row);
   BigIntWrapper fn = reinterpret_cast<BigIntWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-FloatVal ScalarFnCall::GetFloatVal(ExprContext* context, const TupleRow* row) {
+FloatVal ScalarFnCall::GetFloatVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_FLOAT);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<FloatVal>(context, row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<FloatVal>(eval, row);
   FloatWrapper fn = reinterpret_cast<FloatWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-DoubleVal ScalarFnCall::GetDoubleVal(ExprContext* context, const TupleRow* 
row) {
+DoubleVal ScalarFnCall::GetDoubleVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_DOUBLE);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<DoubleVal>(context, 
row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<DoubleVal>(eval, row);
   DoubleWrapper fn = reinterpret_cast<DoubleWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-StringVal ScalarFnCall::GetStringVal(ExprContext* context, const TupleRow* 
row) {
+StringVal ScalarFnCall::GetStringVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK(type_.IsStringType());
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<StringVal>(context, 
row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<StringVal>(eval, row);
   StringWrapper fn = reinterpret_cast<StringWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-TimestampVal ScalarFnCall::GetTimestampVal(ExprContext* context, const 
TupleRow* row) {
+TimestampVal ScalarFnCall::GetTimestampVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_TIMESTAMP);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<TimestampVal>(context, 
row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<TimestampVal>(eval, 
row);
   TimestampWrapper fn = reinterpret_cast<TimestampWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
-DecimalVal ScalarFnCall::GetDecimalVal(ExprContext* context, const TupleRow* 
row) {
+DecimalVal ScalarFnCall::GetDecimalVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_DECIMAL);
-  DCHECK(context != NULL);
-  if (scalar_fn_wrapper_ == NULL) return InterpretEval<DecimalVal>(context, 
row);
+  DCHECK(eval != NULL);
+  if (scalar_fn_wrapper_ == NULL) return InterpretEval<DecimalVal>(eval, row);
   DecimalWrapper fn = reinterpret_cast<DecimalWrapper>(scalar_fn_wrapper_);
-  return fn(context, row);
+  return fn(eval, row);
 }
 
 string ScalarFnCall::DebugString() const {
   stringstream out;
   out << "ScalarFnCall(udf_type=" << fn_.binary_type << " location=" << 
fn_.hdfs_location
-      << " symbol_name=" << fn_.scalar_fn.symbol << Expr::DebugString() << ")";
+      << " symbol_name=" << fn_.scalar_fn.symbol <<  ScalarExpr::DebugString() 
<< ")";
   return out.str();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/scalar-fn-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.h b/be/src/exprs/scalar-fn-call.h
index ffb9a2f..563f91d 100644
--- a/be/src/exprs/scalar-fn-call.h
+++ b/be/src/exprs/scalar-fn-call.h
@@ -21,15 +21,28 @@
 
 #include <string>
 
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "udf/udf.h"
 
-using namespace impala_udf;
-
 namespace impala {
 
+using impala_udf::FunctionContext;
+using impala_udf::AnyVal;
+using impala_udf::BooleanVal;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+using impala_udf::FloatVal;
+using impala_udf::DoubleVal;
+using impala_udf::TimestampVal;
+using impala_udf::StringVal;
+using impala_udf::DecimalVal;
+
+class ScalarExprEvaluator;
 class TExprNode;
 
+///
 /// Expr for evaluating a pre-compiled native or LLVM IR function that uses 
the UDF
 /// interface (i.e. a scalar function). This class overrides 
GetCodegendComputeFn() to
 /// return a function that calls any child exprs and passes the results as 
arguments to the
@@ -41,46 +54,58 @@ class TExprNode;
 /// every possible function signature, codegen may be required to generate the 
call to the
 /// function even if codegen is disabled. Codegen will also be used for IR 
UDFs (note that
 /// there is no way to specify both a native and IR library for a single UDF).
-//
+///
+/// Scalar function call: An expr that returns a single scalar value and can be
+/// implemented using the UDF interface. Note that this includes builtins, 
which although
+/// not being user-defined still use the same interface as UDFs (i.e., they are
+/// implemented as functions with signature "*Val (FunctionContext*, *Val, 
*Val...)").
+///
 /// TODO:
 /// - Fix error reporting, e.g. reporting leaks
 /// - Testing
 ///    - Test cancellation
 ///    - Type descs in UDA test harness
 ///    - Allow more functions to be NULL in UDA test harness
-class ScalarFnCall : public Expr {
+class ScalarFnCall : public ScalarExpr {
  public:
-  virtual std::string DebugString() const;
+  virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** 
fn)
+      override WARN_UNUSED_RESULT;
+  virtual std::string DebugString() const override;
 
  protected:
-  friend class Expr;
-  friend class RuntimeState;
+  friend class ScalarExpr;
+  friend class ScalarExprEvaluator;
+
+  virtual bool HasFnCtx() const override { return true; }
 
   ScalarFnCall(const TExprNode& node);
-  virtual Status Prepare(RuntimeState* state, const RowDescriptor& desc,
-                         ExprContext* context);
-  virtual Status Open(RuntimeState* state, ExprContext* context,
-      FunctionContext::FunctionStateScope scope = 
FunctionContext::FRAGMENT_LOCAL);
-  virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** 
fn);
-  virtual void Close(RuntimeState* state, ExprContext* context,
-      FunctionContext::FunctionStateScope scope = 
FunctionContext::FRAGMENT_LOCAL);
-
-  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow*);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow*);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow*);
-  virtual IntVal GetIntVal(ExprContext* context, const TupleRow*);
-  virtual BigIntVal GetBigIntVal(ExprContext* context, const TupleRow*);
-  virtual FloatVal GetFloatVal(ExprContext* context, const TupleRow*);
-  virtual DoubleVal GetDoubleVal(ExprContext* context, const TupleRow*);
-  virtual StringVal GetStringVal(ExprContext* context, const TupleRow*);
-  virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow*);
-  virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow*);
+  virtual Status Init(const RowDescriptor& row_desc, RuntimeState* state)
+      override WARN_UNUSED_RESULT;
+  virtual Status OpenEvaluator(FunctionContext::FunctionStateScope scope,
+      RuntimeState* state, ScalarExprEvaluator* eval) const override
+      WARN_UNUSED_RESULT;
+  virtual void CloseEvaluator(FunctionContext::FunctionStateScope scope,
+      RuntimeState* state, ScalarExprEvaluator* eval) const override;
+  virtual int ComputeVarArgsBufferSize() const override;
+
+  virtual BooleanVal GetBooleanVal(ScalarExprEvaluator*, const TupleRow*) 
const override;
+  virtual TinyIntVal GetTinyIntVal(ScalarExprEvaluator*, const TupleRow*) 
const override;
+  virtual SmallIntVal GetSmallIntVal(
+      ScalarExprEvaluator*, const TupleRow*) const override;
+  virtual IntVal GetIntVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual BigIntVal GetBigIntVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual FloatVal GetFloatVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual DoubleVal GetDoubleVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual StringVal GetStringVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual TimestampVal GetTimestampVal(
+      ScalarExprEvaluator*, const TupleRow*) const override;
+  virtual DecimalVal GetDecimalVal(ScalarExprEvaluator*, const TupleRow*) 
const override;
 
  private:
   /// If this function has var args, children()[vararg_start_idx_] is the 
first vararg
   /// argument.
   /// If this function does not have varargs, it is set to -1.
-  int vararg_start_idx_;
+  const int vararg_start_idx_;
 
   /// Vector of all non-constant children expressions that need to be 
evaluated for
   /// each input row. The first element of each pair is the child expression 
and the
@@ -88,18 +113,18 @@ class ScalarFnCall : public Expr {
   std::vector<std::pair<Expr*, impala_udf::AnyVal*>> non_constant_children_;
 
   /// Function pointer to the JIT'd function produced by 
GetCodegendComputeFn().
-  /// Has signature *Val (ExprContext*, const TupleRow*), and calls the scalar
+  /// Has signature *Val (ScalarExprEvaluator*, const TupleRow*), and calls 
the scalar
   /// function with signature like *Val (FunctionContext*, const *Val& arg1, 
...)
   void* scalar_fn_wrapper_;
 
   /// The UDF's prepare function, if specified. This is initialized in 
Prepare() and
   /// called in Open() (since we may have needed to codegen the function if 
it's from an
   /// IR module).
-  UdfPrepare prepare_fn_;
+  impala_udf::UdfPrepare prepare_fn_;
 
   /// THe UDF's close function, if specified. This is initialized in Prepare() 
and called
   /// in Close().
-  UdfClose close_fn_;
+  impala_udf::UdfClose close_fn_;
 
   /// If running with codegen disabled, scalar_fn_ will be a pointer to the 
non-JIT'd
   /// scalar function.
@@ -110,7 +135,7 @@ class ScalarFnCall : public Expr {
     return vararg_start_idx_ >= 0 ? vararg_start_idx_ : children_.size();
   }
 
-  int NumVarArgs() const { return children_.size() - NumFixedArgs(); }
+  virtual int NumVarArgs() const { return children_.size() - NumFixedArgs(); }
 
   const ColumnType& VarArgsType() const {
     DCHECK_GE(NumVarArgs(), 1);
@@ -120,22 +145,21 @@ class ScalarFnCall : public Expr {
   /// Loads the native or IR function 'symbol' from HDFS and puts the result 
in *fn.
   /// If the function is loaded from an IR module, it cannot be called until 
the module
   /// has been JIT'd (i.e. after GetCodegendComputeFn() has been called).
-  Status GetFunction(LlvmCodeGen* codegen, const std::string& symbol, void** 
fn);
+  Status GetFunction(LlvmCodeGen* codegen, const std::string& symbol, void** 
fn)
+      WARN_UNUSED_RESULT;
 
   /// Loads the Prepare() and Close() functions for this ScalarFnCall. They 
could be
   /// native or IR functions. To load IR functions, the codegen object must 
have
   /// been created and any external LLVM module must have been linked already.
-  Status LoadPrepareAndCloseFn(LlvmCodeGen* codegen);
+  Status LoadPrepareAndCloseFn(LlvmCodeGen* codegen) WARN_UNUSED_RESULT;
 
   /// Evaluates the non-constant children exprs. Used in the interpreted path.
-  void EvaluateNonConstantChildren(ExprContext* context, const TupleRow* row);
+  void EvaluateNonConstantChildren(
+      ScalarExprEvaluator* eval, const TupleRow* row) const;
 
   /// Function to call scalar_fn_. Used in the interpreted path.
   template <typename RETURN_TYPE>
-  RETURN_TYPE InterpretEval(ExprContext* context, const TupleRow* row);
-
-  /// Computes the size of the varargs buffer in bytes (0 bytes if no varargs).
-  int ComputeVarArgsBufferSize() const;
+  RETURN_TYPE InterpretEval(ScalarExprEvaluator* eval, const TupleRow* row) 
const;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/slot-ref.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc
index 3465ff4..49931c3 100644
--- a/be/src/exprs/slot-ref.cc
+++ b/be/src/exprs/slot-ref.cc
@@ -22,6 +22,7 @@
 
 #include "codegen/codegen-anyval.h"
 #include "codegen/llvm-codegen.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "gen-cpp/Exprs_types.h"
 #include "runtime/collection-value.h"
 #include "runtime/decimal-value.h"
@@ -39,7 +40,7 @@ using namespace llvm;
 namespace impala {
 
 SlotRef::SlotRef(const TExprNode& node)
-  : Expr(node, true),
+  : ScalarExpr(node),
     slot_offset_(-1),  // invalid
     null_indicator_offset_(0, 0),
     slot_id_(node.slot_ref.slot_id) {
@@ -47,7 +48,7 @@ SlotRef::SlotRef(const TExprNode& node)
 }
 
 SlotRef::SlotRef(const SlotDescriptor* desc)
-  : Expr(desc->type(), false, true),
+  : ScalarExpr(desc->type(), false),
     slot_offset_(-1),
     null_indicator_offset_(0, 0),
     slot_id_(desc->id()) {
@@ -55,48 +56,48 @@ SlotRef::SlotRef(const SlotDescriptor* desc)
 }
 
 SlotRef::SlotRef(const SlotDescriptor* desc, const ColumnType& type)
-  : Expr(type, false, true),
+  : ScalarExpr(type, false),
     slot_offset_(-1),
     null_indicator_offset_(0, 0),
     slot_id_(desc->id()) {
     // slot_/null_indicator_offset_ are set in Prepare()
 }
 
-  SlotRef::SlotRef(const ColumnType& type, int offset, const bool nullable /* 
= false */)
-  : Expr(type, false, true),
+SlotRef::SlotRef(const ColumnType& type, int offset, const bool nullable /* = 
false */)
+  : ScalarExpr(type, false),
     tuple_idx_(0),
     slot_offset_(offset),
     null_indicator_offset_(0, nullable ? offset : -1),
     slot_id_(-1) {
 }
 
-Status SlotRef::Prepare(RuntimeState* state, const RowDescriptor& row_desc,
-                        ExprContext* context) {
+Status SlotRef::Init(const RowDescriptor& row_desc, RuntimeState* state) {
   DCHECK_EQ(children_.size(), 0);
-  if (slot_id_ == -1) return Status::OK();
-
-  const SlotDescriptor* slot_desc  = 
state->desc_tbl().GetSlotDescriptor(slot_id_);
-  if (slot_desc == NULL) {
-    // TODO: create macro MAKE_ERROR() that returns a stream
-    stringstream error;
-    error << "couldn't resolve slot descriptor " << slot_id_;
-    LOG(INFO) << error.str();
-    return Status(error.str());
-  }
-  tuple_idx_ = row_desc.GetTupleIdx(slot_desc->parent()->id());
-  if (tuple_idx_ == RowDescriptor::INVALID_IDX) {
-    TupleDescriptor* d = 
state->desc_tbl().GetTupleDescriptor(slot_desc->parent()->id());
-    LOG(INFO) << "invalid idx: " << slot_desc->DebugString()
-              << "\nparent=" << d->DebugString()
-              << "\nrow=" << row_desc.DebugString();
-    stringstream error;
-    error << "invalid tuple_idx";
-    return Status(error.str());
+  if (slot_id_ != -1) {
+    const SlotDescriptor* slot_desc = 
state->desc_tbl().GetSlotDescriptor(slot_id_);
+    if (slot_desc == NULL) {
+      // TODO: create macro MAKE_ERROR() that returns a stream
+      stringstream error;
+      error << "couldn't resolve slot descriptor " << slot_id_;
+      LOG(INFO) << error.str();
+      return Status(error.str());
+    }
+    tuple_idx_ = row_desc.GetTupleIdx(slot_desc->parent()->id());
+    if (tuple_idx_ == RowDescriptor::INVALID_IDX) {
+      TupleDescriptor* d =
+          state->desc_tbl().GetTupleDescriptor(slot_desc->parent()->id());
+      LOG(INFO) << "invalid idx: " << slot_desc->DebugString()
+                << "\nparent=" << d->DebugString()
+                << "\nrow=" << row_desc.DebugString();
+      stringstream error;
+      error << "invalid tuple_idx";
+      return Status(error.str());
+    }
+    DCHECK(tuple_idx_ != RowDescriptor::INVALID_IDX);
+    tuple_is_nullable_ = row_desc.TupleIsNullable(tuple_idx_);
+    slot_offset_ = slot_desc->tuple_offset();
+    null_indicator_offset_ = slot_desc->null_indicator_offset();
   }
-  DCHECK(tuple_idx_ != RowDescriptor::INVALID_IDX);
-  tuple_is_nullable_ = row_desc.TupleIsNullable(tuple_idx_);
-  slot_offset_ = slot_desc->tuple_offset();
-  null_indicator_offset_ = slot_desc->null_indicator_offset();
   return Status::OK();
 }
 
@@ -111,7 +112,7 @@ string SlotRef::DebugString() const {
       << " tuple_idx=" << tuple_idx_ << " slot_offset=" << slot_offset_
       << " tuple_is_nullable=" << tuple_is_nullable_
       << " null_indicator=" << null_indicator_offset_
-      << Expr::DebugString() << ")";
+      << ScalarExpr::DebugString() << ")";
   return out.str();
 }
 
@@ -185,7 +186,7 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* codegen, 
llvm::Function** fn)
 
   LLVMContext& context = codegen->context();
   Value* args[2];
-  *fn = CreateIrFunctionPrototype(codegen, "GetSlotRef", &args);
+  *fn = CreateIrFunctionPrototype("GetSlotRef", codegen, &args);
   Value* row_ptr = args[1];
 
   Value* tuple_offset = ConstantInt::get(codegen->int_type(), tuple_idx_);
@@ -358,61 +359,70 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* 
codegen, llvm::Function** fn)
   }
 
   *fn = codegen->FinalizeFunction(*fn);
-  codegen->RegisterExprFn(unique_slot_id, *fn);
+  if (UNLIKELY(*fn == NULL)) return Status(TErrorCode::IR_VERIFY_FAILED, 
"SlotRef");
   ir_compute_fn_ = *fn;
+  codegen->RegisterExprFn(unique_slot_id, *fn);
   return Status::OK();
 }
 
-BooleanVal SlotRef::GetBooleanVal(ExprContext* context, const TupleRow* row) {
+BooleanVal SlotRef::GetBooleanVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_BOOLEAN);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
BooleanVal::null();
   return BooleanVal(*reinterpret_cast<bool*>(t->GetSlot(slot_offset_)));
 }
 
-TinyIntVal SlotRef::GetTinyIntVal(ExprContext* context, const TupleRow* row) {
+TinyIntVal SlotRef::GetTinyIntVal(
+   ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_TINYINT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
TinyIntVal::null();
   return TinyIntVal(*reinterpret_cast<int8_t*>(t->GetSlot(slot_offset_)));
 }
 
-SmallIntVal SlotRef::GetSmallIntVal(ExprContext* context, const TupleRow* row) 
{
+SmallIntVal SlotRef::GetSmallIntVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_SMALLINT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
SmallIntVal::null();
   return SmallIntVal(*reinterpret_cast<int16_t*>(t->GetSlot(slot_offset_)));
 }
 
-IntVal SlotRef::GetIntVal(ExprContext* context, const TupleRow* row) {
+IntVal SlotRef::GetIntVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_INT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return IntVal::null();
   return IntVal(*reinterpret_cast<int32_t*>(t->GetSlot(slot_offset_)));
 }
 
-BigIntVal SlotRef::GetBigIntVal(ExprContext* context, const TupleRow* row) {
+BigIntVal SlotRef::GetBigIntVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_BIGINT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return BigIntVal::null();
   return BigIntVal(*reinterpret_cast<int64_t*>(t->GetSlot(slot_offset_)));
 }
 
-FloatVal SlotRef::GetFloatVal(ExprContext* context, const TupleRow* row) {
+FloatVal SlotRef::GetFloatVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_FLOAT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return FloatVal::null();
   return FloatVal(*reinterpret_cast<float*>(t->GetSlot(slot_offset_)));
 }
 
-DoubleVal SlotRef::GetDoubleVal(ExprContext* context, const TupleRow* row) {
+DoubleVal SlotRef::GetDoubleVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_DOUBLE);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return DoubleVal::null();
   return DoubleVal(*reinterpret_cast<double*>(t->GetSlot(slot_offset_)));
 }
 
-StringVal SlotRef::GetStringVal(ExprContext* context, const TupleRow* row) {
+StringVal SlotRef::GetStringVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK(type_.IsStringType());
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return StringVal::null();
@@ -428,7 +438,8 @@ StringVal SlotRef::GetStringVal(ExprContext* context, const 
TupleRow* row) {
   return result;
 }
 
-TimestampVal SlotRef::GetTimestampVal(ExprContext* context, const TupleRow* 
row) {
+TimestampVal SlotRef::GetTimestampVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_TIMESTAMP);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
TimestampVal::null();
@@ -438,7 +449,8 @@ TimestampVal SlotRef::GetTimestampVal(ExprContext* context, 
const TupleRow* row)
   return result;
 }
 
-DecimalVal SlotRef::GetDecimalVal(ExprContext* context, const TupleRow* row) {
+DecimalVal SlotRef::GetDecimalVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK_EQ(type_.type, TYPE_DECIMAL);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
DecimalVal::null();
@@ -455,7 +467,8 @@ DecimalVal SlotRef::GetDecimalVal(ExprContext* context, 
const TupleRow* row) {
   }
 }
 
-CollectionVal SlotRef::GetCollectionVal(ExprContext* context, const TupleRow* 
row) {
+CollectionVal SlotRef::GetCollectionVal(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
   DCHECK(type_.IsCollectionType());
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
CollectionVal::null();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/slot-ref.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/slot-ref.h b/be/src/exprs/slot-ref.h
index e86617d..82d25ef 100644
--- a/be/src/exprs/slot-ref.h
+++ b/be/src/exprs/slot-ref.h
@@ -18,13 +18,24 @@
 #ifndef IMPALA_EXPRS_SLOTREF_H
 #define IMPALA_EXPRS_SLOTREF_H
 
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
 
 namespace impala {
 
+using impala_udf::BooleanVal;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+using impala_udf::FloatVal;
+using impala_udf::DoubleVal;
+using impala_udf::TimestampVal;
+using impala_udf::StringVal;
+using impala_udf::DecimalVal;
+
 /// Reference to a single slot of a tuple.
-class SlotRef : public Expr {
+class SlotRef : public ScalarExpr {
  public:
   SlotRef(const TExprNode& node);
   SlotRef(const SlotDescriptor* desc);
@@ -36,27 +47,36 @@ class SlotRef : public Expr {
   /// Used for testing.  GetValue will return tuple + offset interpreted as 
'type'
   SlotRef(const ColumnType& type, int offset, const bool nullable = false);
 
-  virtual Status Prepare(
-      RuntimeState* state, const RowDescriptor& row_desc, ExprContext* 
context);
-  virtual std::string DebugString() const;
-  virtual int GetSlotIds(std::vector<SlotId>* slot_ids) const;
+  /// Exposed as public so AGG node can initialize its build expressions.
+  virtual Status Init(const RowDescriptor& row_desc, RuntimeState* state)
+      override WARN_UNUSED_RESULT;
+  virtual std::string DebugString() const override;
+  virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** 
fn)
+      override WARN_UNUSED_RESULT;
+  virtual bool IsSlotRef() const override { return true; }
+  virtual int GetSlotIds(std::vector<SlotId>* slot_ids) const override;
   const SlotId& slot_id() const { return slot_id_; }
 
-  virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** 
fn);
+ protected:
+  friend class ScalarExpr;
+  friend class ScalarExprEvaluator;
 
-  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::TinyIntVal GetTinyIntVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::SmallIntVal GetSmallIntVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::IntVal GetIntVal(ExprContext* context, const TupleRow*);
-  virtual impala_udf::BigIntVal GetBigIntVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::FloatVal GetFloatVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::DoubleVal GetDoubleVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::StringVal GetStringVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::TimestampVal GetTimestampVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::DecimalVal GetDecimalVal(ExprContext* context, const 
TupleRow*);
-  virtual impala_udf::CollectionVal GetCollectionVal(ExprContext* context, 
const TupleRow*);
+  virtual BooleanVal GetBooleanVal(ScalarExprEvaluator*, const TupleRow*) 
const override;
+  virtual TinyIntVal GetTinyIntVal(ScalarExprEvaluator*, const TupleRow*) 
const override;
+  virtual SmallIntVal GetSmallIntVal(
+      ScalarExprEvaluator*, const TupleRow*) const override;
+  virtual IntVal GetIntVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual BigIntVal GetBigIntVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual FloatVal GetFloatVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual DoubleVal GetDoubleVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual StringVal GetStringVal(ScalarExprEvaluator*, const TupleRow*) const 
override;
+  virtual TimestampVal GetTimestampVal(
+      ScalarExprEvaluator*, const TupleRow*) const override;
+  virtual DecimalVal GetDecimalVal(ScalarExprEvaluator*, const TupleRow*) 
const override;
+  virtual CollectionVal GetCollectionVal(
+      ScalarExprEvaluator*, const TupleRow*) const override;
 
- protected:
+ private:
   int tuple_idx_;  // within row
   int slot_offset_;  // within tuple
   NullIndicatorOffset null_indicator_offset_;  // within tuple

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/string-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/string-functions-ir.cc 
b/be/src/exprs/string-functions-ir.cc
index 0a05739..09e19fb 100644
--- a/be/src/exprs/string-functions-ir.cc
+++ b/be/src/exprs/string-functions-ir.cc
@@ -26,7 +26,7 @@
 #include <boost/static_assert.hpp>
 
 #include "exprs/anyval-util.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/tuple-row.h"
 #include "util/bit-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/string-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/string-functions.h b/be/src/exprs/string-functions.h
index 3dad5f6..86fc547 100644
--- a/be/src/exprs/string-functions.h
+++ b/be/src/exprs/string-functions.h
@@ -28,6 +28,19 @@ using namespace impala_udf;
 
 namespace impala {
 
+using impala_udf::FunctionContext;
+using impala_udf::AnyVal;
+using impala_udf::BooleanVal;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+using impala_udf::FloatVal;
+using impala_udf::DoubleVal;
+using impala_udf::TimestampVal;
+using impala_udf::StringVal;
+using impala_udf::DecimalVal;
+
 class Expr;
 class OpcodeRegistry;
 class TupleRow;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/timestamp-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/timestamp-functions.h 
b/be/src/exprs/timestamp-functions.h
index b02e7bd..cbd8b98 100644
--- a/be/src/exprs/timestamp-functions.h
+++ b/be/src/exprs/timestamp-functions.h
@@ -22,10 +22,21 @@
 #include "common/status.h"
 #include "udf/udf.h"
 
-using namespace impala_udf;
-
 namespace impala {
 
+using impala_udf::FunctionContext;
+using impala_udf::AnyVal;
+using impala_udf::BooleanVal;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+using impala_udf::FloatVal;
+using impala_udf::DoubleVal;
+using impala_udf::TimestampVal;
+using impala_udf::StringVal;
+using impala_udf::DecimalVal;
+
 class Expr;
 class OpcodeRegistry;
 struct StringValue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/tuple-is-null-predicate.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/tuple-is-null-predicate.cc 
b/be/src/exprs/tuple-is-null-predicate.cc
index dd20e8a..2762247 100644
--- a/be/src/exprs/tuple-is-null-predicate.cc
+++ b/be/src/exprs/tuple-is-null-predicate.cc
@@ -27,7 +27,8 @@
 
 namespace impala {
 
-BooleanVal TupleIsNullPredicate::GetBooleanVal(ExprContext* ctx, const 
TupleRow* row) {
+BooleanVal TupleIsNullPredicate::GetBooleanVal(
+    ScalarExprEvaluator* evaluator, const TupleRow* row) const {
   int count = 0;
   for (int i = 0; i < tuple_idxs_.size(); ++i) {
     count += row->GetTuple(tuple_idxs_[i]) == NULL;
@@ -41,9 +42,8 @@ TupleIsNullPredicate::TupleIsNullPredicate(const TExprNode& 
node)
                node.tuple_is_null_pred.tuple_ids.end()) {
 }
 
-Status TupleIsNullPredicate::Prepare(RuntimeState* state, const RowDescriptor& 
row_desc,
-                                     ExprContext* ctx) {
-  RETURN_IF_ERROR(Expr::Prepare(state, row_desc, ctx));
+Status TupleIsNullPredicate::Init(const RowDescriptor& row_desc, RuntimeState* 
state) {
+  RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state));
   DCHECK_EQ(0, children_.size());
   // Resolve tuple ids to tuple indexes.
   for (int i = 0; i < tuple_ids_.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/tuple-is-null-predicate.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/tuple-is-null-predicate.h 
b/be/src/exprs/tuple-is-null-predicate.h
index e687010..611f7e6 100644
--- a/be/src/exprs/tuple-is-null-predicate.h
+++ b/be/src/exprs/tuple-is-null-predicate.h
@@ -32,16 +32,16 @@ class TExprNode;
 /// TODO: Implement codegen to eliminate overhead on non-nullable tuples.
 class TupleIsNullPredicate: public Predicate {
  protected:
-  friend class Expr;
+  friend class ScalarExpr;
 
   TupleIsNullPredicate(const TExprNode& node);
 
-  virtual Status Prepare(RuntimeState* state, const RowDescriptor& row_desc,
-                         ExprContext* ctx);
-  virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** 
fn);
-  virtual std::string DebugString() const;
+  virtual Status Init(const RowDescriptor& row_desc, RuntimeState* state) 
override;
+  virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** 
fn)
+      override WARN_UNUSED_RESULT;
+  virtual std::string DebugString() const override;
 
-  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow* row);
+  virtual BooleanVal GetBooleanVal(ScalarExprEvaluator*, const TupleRow*) 
const override;
 
  private:
   /// Tuple ids to check for NULL. May contain ids of nullable and 
non-nullable tuples.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/udf-builtins.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/udf-builtins.h b/be/src/exprs/udf-builtins.h
index 3bbc5d2..2ef439b 100644
--- a/be/src/exprs/udf-builtins.h
+++ b/be/src/exprs/udf-builtins.h
@@ -20,10 +20,20 @@
 
 #include "udf/udf.h"
 
-using namespace impala_udf;
-
 namespace impala {
 
+using impala_udf::FunctionContext;
+using impala_udf::BooleanVal;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+using impala_udf::FloatVal;
+using impala_udf::DoubleVal;
+using impala_udf::TimestampVal;
+using impala_udf::StringVal;
+using impala_udf::DecimalVal;
+
 /// Builtins written against the UDF interface. The builtins in the other files
 /// should be replaced to the UDF interface as well.
 /// This is just to illustrate how builtins against the UDF interface will be

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/utility-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/utility-functions.h b/be/src/exprs/utility-functions.h
index a8dbc7b..41409c4 100644
--- a/be/src/exprs/utility-functions.h
+++ b/be/src/exprs/utility-functions.h
@@ -21,10 +21,21 @@
 
 #include "udf/udf.h"
 
-using namespace impala_udf;
-
 namespace impala {
 
+using impala_udf::FunctionContext;
+using impala_udf::AnyVal;
+using impala_udf::BooleanVal;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+using impala_udf::FloatVal;
+using impala_udf::DoubleVal;
+using impala_udf::TimestampVal;
+using impala_udf::StringVal;
+using impala_udf::DecimalVal;
+
 class Expr;
 class OpcodeRegistry;
 class TupleRow;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index cd3a741..4e2aca8 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -18,6 +18,8 @@
 #include "runtime/coordinator-backend-state.h"
 
 #include <sstream>
+#include <string>
+#include <boost/lexical_cast.hpp>
 #include <boost/thread/locks.hpp>
 #include <boost/thread/lock_guard.hpp>
 #include <boost/accumulators/accumulators.hpp>
@@ -41,6 +43,7 @@
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
+
 #include "common/names.h"
 
 using namespace impala;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 3c5ffbe..2bfe1b5 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -587,7 +587,8 @@ Status Coordinator::FinalizeSuccessfulInsert() {
   // TODO: add DescriptorTbl::CreateTableDescriptor() so we can create a
   // descriptor for just the output table, calling Create() can be very
   // expensive.
-  DescriptorTbl::Create(obj_pool(), query_ctx_.desc_tbl, nullptr, 
&descriptor_table);
+  RETURN_IF_ERROR(
+      DescriptorTbl::Create(obj_pool(), query_ctx_.desc_tbl, 
&descriptor_table));
   HdfsTableDescriptor* hdfs_table = static_cast<HdfsTableDescriptor*>(
       descriptor_table->GetTableDescriptor(finalize_params_.table_id));
   DCHECK(hdfs_table != nullptr)
@@ -697,6 +698,11 @@ Status Coordinator::FinalizeSuccessfulInsert() {
     }
   }
 
+  // Release resources on the descriptor table.
+  descriptor_table->ReleaseResources();
+  descriptor_table = nullptr;
+  hdfs_table = nullptr;
+
   {
     SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, 
"Overwrite/PartitionCreationTimer",
           "FinalizationTimer"));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 0d772eb..e3106b0 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -53,8 +53,6 @@ class RowBatch;
 class RowDescriptor;
 class ObjectPool;
 class RuntimeState;
-class Expr;
-class ExprContext;
 class ExecEnv;
 class TUpdateCatalogRequest;
 class TQueryExecRequest;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index dc98c49..a45c2c4 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -21,8 +21,8 @@
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include "common/logging.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
 #include "runtime/tuple-row.h"
@@ -325,7 +325,7 @@ void DataStreamSender::Channel::Teardown(RuntimeState* 
state) {
   batch_.reset();
 }
 
-DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id,
+DataStreamSender::DataStreamSender(int sender_id,
     const RowDescriptor& row_desc, const TDataStreamSink& sink,
     const vector<TPlanFragmentDestination>& destinations,
     int per_channel_buffer_size)
@@ -351,8 +351,8 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id,
   for (int i = 0; i < destinations.size(); ++i) {
     channels_.push_back(
         new Channel(this, row_desc, destinations[i].server,
-                    destinations[i].fragment_instance_id,
-                    sink.dest_node_id, per_channel_buffer_size));
+            destinations[i].fragment_instance_id, sink.dest_node_id,
+            per_channel_buffer_size));
   }
 
   if (partition_type_ == TPartitionType::UNPARTITIONED
@@ -361,14 +361,6 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id,
     srand(reinterpret_cast<uint64_t>(this));
     random_shuffle(channels_.begin(), channels_.end());
   }
-
-  if (partition_type_ == TPartitionType::HASH_PARTITIONED
-      || partition_type_ == TPartitionType::KUDU) {
-    // TODO: move this to Init()? would need to save 'sink' somewhere
-    Status status = Expr::CreateExprTrees(
-        pool, sink.output_partition.partition_exprs, &partition_expr_ctxs_);
-    DCHECK(status.ok());
-  }
 }
 
 string DataStreamSender::GetName() {
@@ -383,14 +375,23 @@ DataStreamSender::~DataStreamSender() {
   }
 }
 
+Status DataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
+    const TDataSink& tsink, RuntimeState* state) {
+  DCHECK(tsink.__isset.stream_sink);
+  if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
+      partition_type_ == TPartitionType::KUDU) {
+    
RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
+        row_desc_, state, &partition_exprs_));
+  }
+  return Status::OK();
+}
+
 Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   state_ = state;
   SCOPED_TIMER(profile_->total_time_counter());
-
-  RETURN_IF_ERROR(
-      Expr::Prepare(partition_expr_ctxs_, state, row_desc_, 
mem_tracker_.get()));
-
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state,
+      state->obj_pool(), expr_mem_pool(), &partition_expr_evals_));
   bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
@@ -414,7 +415,7 @@ Status DataStreamSender::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tra
 }
 
 Status DataStreamSender::Open(RuntimeState* state) {
-  return Expr::Open(partition_expr_ctxs_, state);
+  return ScalarExprEvaluator::Open(partition_expr_evals_, state);
 }
 
 Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
@@ -422,8 +423,7 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
   DCHECK(!flushed_);
 
   if (batch->num_rows() == 0) return Status::OK();
-  if (partition_type_ == TPartitionType::UNPARTITIONED
-      || channels_.size() == 1) {
+  if (partition_type_ == TPartitionType::UNPARTITIONED || channels_.size() == 
1) {
     // current_thrift_batch_ is *not* the one that was written by the last call
     // to Serialize()
     RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, 
channels_.size()));
@@ -443,12 +443,12 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     
RETURN_IF_ERROR(current_channel->SendBatch(current_channel->thrift_batch()));
     current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size();
   } else if (partition_type_ == TPartitionType::KUDU) {
-    DCHECK_EQ(partition_expr_ctxs_.size(), 1);
+    DCHECK_EQ(partition_expr_evals_.size(), 1);
     int num_channels = channels_.size();
     for (int i = 0; i < batch->num_rows(); ++i) {
       TupleRow* row = batch->GetRow(i);
       int32_t partition =
-          *reinterpret_cast<int32_t*>(partition_expr_ctxs_[0]->GetValue(row));
+          *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
       if (partition < 0) {
         // This row doesn't coorespond to a partition, e.g. it's outside the 
given ranges.
         partition = next_unknown_partition_;
@@ -465,17 +465,18 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     for (int i = 0; i < batch->num_rows(); ++i) {
       TupleRow* row = batch->GetRow(i);
       uint32_t hash_val = HashUtil::FNV_SEED;
-      for (int i = 0; i < partition_expr_ctxs_.size(); ++i) {
-        ExprContext* ctx = partition_expr_ctxs_[i];
-        void* partition_val = ctx->GetValue(row);
+      for (int i = 0; i < partition_exprs_.size(); ++i) {
+        ScalarExprEvaluator* eval = partition_expr_evals_[i];
+        void* partition_val = eval->GetValue(row);
         // We can't use the crc hash function here because it does not result
         // in uncorrelated hashes with different seeds.  Instead we must use
         // fnv hash.
         // TODO: fix crc hash/GetHashValue()
-        hash_val =
-            RawValue::GetHashValueFnv(partition_val, ctx->root()->type(), 
hash_val);
+        DCHECK(&partition_expr_evals_[i]->root() == partition_exprs_[i]);
+        hash_val = RawValue::GetHashValueFnv(
+            partition_val, partition_exprs_[i]->type(), hash_val);
       }
-      ExprContext::FreeLocalAllocations(partition_expr_ctxs_);
+      ScalarExprEvaluator::FreeLocalAllocations(partition_expr_evals_);
       RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
     }
   }
@@ -502,7 +503,8 @@ void DataStreamSender::Close(RuntimeState* state) {
   for (int i = 0; i < channels_.size(); ++i) {
     channels_[i]->Teardown(state);
   }
-  Expr::Close(partition_expr_ctxs_, state);
+  ScalarExprEvaluator::Close(partition_expr_evals_, state);
+  ScalarExpr::Close(partition_exprs_);
   DataSink::Close(state);
   closed_ = true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h 
b/be/src/runtime/data-stream-sender.h
index d157577..0719daf 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -31,10 +31,9 @@
 
 namespace impala {
 
-class Expr;
-class ExprContext;
 class RowBatch;
 class RowDescriptor;
+
 class MemTracker;
 class TDataStreamSink;
 class TNetworkAddress;
@@ -47,6 +46,7 @@ class TPlanFragmentDestination;
 //
 /// TODO: capture stats that describe distribution of rows/data volume
 /// across channels.
+/// TODO: create a PlanNode equivalent class for DataSink.
 class DataStreamSender : public DataSink {
  public:
   /// Construct a sender according to the output specification (sink),
@@ -57,10 +57,11 @@ class DataStreamSender : public DataSink {
   /// The RowDescriptor must live until Close() is called.
   /// NOTE: supported partition types are UNPARTITIONED (broadcast), 
HASH_PARTITIONED,
   /// and RANDOM.
-  DataStreamSender(ObjectPool* pool, int sender_id,
-    const RowDescriptor& row_desc, const TDataStreamSink& sink,
-    const std::vector<TPlanFragmentDestination>& destinations,
-    int per_channel_buffer_size);
+  DataStreamSender(int sender_id, const RowDescriptor& row_desc,
+      const TDataStreamSink& tsink,
+      const std::vector<TPlanFragmentDestination>& destinations,
+      int per_channel_buffer_size);
+
   virtual ~DataStreamSender();
 
   virtual std::string GetName();
@@ -98,6 +99,12 @@ class DataStreamSender : public DataSink {
   /// broadcast to multiple receivers, they are counted once per receiver.
   int64_t GetNumDataBytesSent() const;
 
+ protected:
+  friend class DataStreamTest;
+
+  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
+      const TDataSink& tsink, RuntimeState* state);
+
  private:
   class Channel;
 
@@ -120,9 +127,13 @@ class DataStreamSender : public DataSink {
   TRowBatch thrift_batch2_;
   TRowBatch* current_thrift_batch_;  // the next one to fill in Send()
 
-  std::vector<ExprContext*> partition_expr_ctxs_;  // compute per-row 
partition values
   std::vector<Channel*> channels_;
 
+  /// Expressions of partition keys. It's used to compute the
+  /// per-row partition values for shuffling exchange;
+  std::vector<ScalarExpr*> partition_exprs_;
+  std::vector<ScalarExprEvaluator*> partition_expr_evals_;
+
   RuntimeProfile::Counter* serialize_batch_timer_;
   /// The concurrent wall time spent sending data over the network.
   RuntimeProfile::ConcurrentTimerCounter* thrift_transmit_timer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 4cb2bf0..1048fb6 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -113,6 +113,7 @@ class DataStreamTest : public testing::Test {
     // Initialize MemTrackers and RuntimeState for use by the data stream 
receiver.
     exec_env_.InitForFeTests();
     runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_));
+    mem_pool_.reset(new MemPool(&tracker_));
 
     // Stop tests that rely on mismatched sender / receiver pairs timing out 
from failing.
     FLAGS_datastream_sender_timeout_ms = 250;
@@ -121,6 +122,9 @@ class DataStreamTest : public testing::Test {
 
   virtual void SetUp() {
     CreateRowDesc();
+
+    is_asc_.push_back(true);
+    nulls_first_.push_back(true);
     CreateTupleComparator();
 
     next_instance_id_.lo = 0;
@@ -156,20 +160,29 @@ class DataStreamTest : public testing::Test {
     StartBackend();
   }
 
-  const TDataStreamSink& GetSink(TPartitionType::type partition_type) {
+  const TDataSink GetSink(TPartitionType::type partition_type) {
+    TDataSink tdata_sink;
     switch (partition_type) {
-      case TPartitionType::UNPARTITIONED: return broadcast_sink_;
-      case TPartitionType::RANDOM: return random_sink_;
-      case TPartitionType::HASH_PARTITIONED: return hash_sink_;
-      default: EXPECT_TRUE(false) << "Unhandled sink type: " << partition_type;
+      case TPartitionType::UNPARTITIONED:
+        tdata_sink.__set_stream_sink(broadcast_sink_);
+        break;
+      case TPartitionType::RANDOM:
+        tdata_sink.__set_stream_sink(random_sink_);
+        break;
+      case TPartitionType::HASH_PARTITIONED:
+        tdata_sink.__set_stream_sink(hash_sink_);
+        break;
+      default:
+        EXPECT_TRUE(false) << "Unhandled sink type: " << partition_type;
     }
-    // Should never reach this.
-    return broadcast_sink_;
+    return tdata_sink;
   }
 
   virtual void TearDown() {
-    lhs_slot_ctx_->Close(NULL);
-    rhs_slot_ctx_->Close(NULL);
+    desc_tbl_->ReleaseResources();
+    less_than_->Close(runtime_state_.get());
+    ScalarExpr::Close(ordering_exprs_);
+    mem_pool_->FreeAll();
     exec_env_.impalad_client_cache()->TestShutdown();
     StopBackend();
   }
@@ -182,13 +195,18 @@ class DataStreamTest : public testing::Test {
 
   ObjectPool obj_pool_;
   MemTracker tracker_;
+  scoped_ptr<MemPool> mem_pool_;
   DescriptorTbl* desc_tbl_;
   const RowDescriptor* row_desc_;
+  vector<bool> is_asc_;
+  vector<bool> nulls_first_;
   TupleRowComparator* less_than_;
   ExecEnv exec_env_;
   scoped_ptr<RuntimeState> runtime_state_;
   TUniqueId next_instance_id_;
   string stmt_;
+  // The sorting expression for the single BIGINT column.
+  vector<ScalarExpr*> ordering_exprs_;
 
   // RowBatch generation
   scoped_ptr<RowBatch> batch_;
@@ -210,7 +228,7 @@ class DataStreamTest : public testing::Test {
     Status status;
     int num_bytes_sent;
 
-    SenderInfo(): thread_handle(NULL), num_bytes_sent(0) {}
+    SenderInfo(): thread_handle(nullptr), num_bytes_sent(0) {}
   };
   vector<SenderInfo> sender_info_;
 
@@ -229,7 +247,7 @@ class DataStreamTest : public testing::Test {
       : stream_type(stream_type),
         num_senders(num_senders),
         receiver_num(receiver_num),
-        thread_handle(NULL),
+        thread_handle(nullptr),
         num_rows_received(0) {}
 
     ~ReceiverInfo() {
@@ -271,7 +289,7 @@ class DataStreamTest : public testing::Test {
     slot_desc.__set_nullIndicatorBit(-1);
     slot_desc.__set_slotIdx(0);
     thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
-    EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, nullptr, 
&desc_tbl_));
+    EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, &desc_tbl_));
 
     vector<TTupleId> row_tids;
     row_tids.push_back(0);
@@ -283,19 +301,11 @@ class DataStreamTest : public testing::Test {
   // Create a tuple comparator to sort in ascending order on the single bigint 
column.
   void CreateTupleComparator() {
     SlotRef* lhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
-    lhs_slot_ctx_ = obj_pool_.Add(new ExprContext(lhs_slot));
-    SlotRef* rhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
-    rhs_slot_ctx_ = obj_pool_.Add(new ExprContext(rhs_slot));
-
-    lhs_slot_ctx_->Prepare(NULL, *row_desc_, &tracker_);
-    rhs_slot_ctx_->Prepare(NULL, *row_desc_, &tracker_);
-    lhs_slot_ctx_->Open(NULL);
-    rhs_slot_ctx_->Open(NULL);
-    SortExecExprs* sort_exprs = obj_pool_.Add(new SortExecExprs());
-    sort_exprs->Init(
-        vector<ExprContext*>(1, lhs_slot_ctx_), vector<ExprContext*>(1, 
rhs_slot_ctx_));
-    less_than_ = obj_pool_.Add(new TupleRowComparator(
-        *sort_exprs, vector<bool>(1, true), vector<bool>(1, false)));
+    ASSERT_OK(lhs_slot->Init(RowDescriptor(), runtime_state_.get()));
+    ordering_exprs_.push_back(lhs_slot);
+    less_than_ = obj_pool_.Add(new TupleRowComparator(ordering_exprs_,
+        is_asc_, nulls_first_));
+    less_than_->Open(&obj_pool_, runtime_state_.get(), mem_pool_.get());
   }
 
   // Create batch_, but don't fill it with data yet. Assumes we created 
row_desc_.
@@ -323,7 +333,7 @@ class DataStreamTest : public testing::Test {
 
   // Start receiver (expecting given number of senders) in separate thread.
   void StartReceiver(TPartitionType::type stream_type, int num_senders, int 
receiver_num,
-                     int buffer_size, bool is_merging, TUniqueId* out_id = 
NULL) {
+      int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) {
     VLOG_QUERY << "start receiver";
     RuntimeProfile* profile =
         obj_pool_.Add(new RuntimeProfile(&obj_pool_, "TestReceiver"));
@@ -339,7 +349,7 @@ class DataStreamTest : public testing::Test {
       info.thread_handle = new thread(&DataStreamTest::ReadStreamMerging, 
this, &info,
           profile);
     }
-    if (out_id != NULL) *out_id = instance_id;
+    if (out_id != nullptr) *out_id = instance_id;
   }
 
   void JoinReceivers() {
@@ -355,7 +365,7 @@ class DataStreamTest : public testing::Test {
     RowBatch* batch;
     VLOG_QUERY <<  "start reading";
     while (!(info->status = 
info->stream_recvr->GetBatch(&batch)).IsCancelled() &&
-        (batch != NULL)) {
+        (batch != nullptr)) {
       VLOG_QUERY << "read batch #rows=" << batch->num_rows();
       for (int i = 0; i < batch->num_rows(); ++i) {
         TupleRow* row = batch->GetRow(i);
@@ -444,7 +454,7 @@ class DataStreamTest : public testing::Test {
   void StartBackend() {
     boost::shared_ptr<ImpalaTestBackend> handler(new 
ImpalaTestBackend(stream_mgr_));
     boost::shared_ptr<TProcessor> processor(new 
ImpalaInternalServiceProcessor(handler));
-    server_ = new ThriftServer("DataStreamTest backend", processor, 
FLAGS_port, NULL);
+    server_ = new ThriftServer("DataStreamTest backend", processor, 
FLAGS_port, nullptr);
     server_->Start();
   }
 
@@ -477,9 +487,16 @@ class DataStreamTest : public testing::Test {
       int sender_num, int channel_buffer_size, TPartitionType::type 
partition_type) {
     RuntimeState state(TQueryCtx(), &exec_env_, desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
-    const TDataStreamSink& sink = GetSink(partition_type);
+    const TDataSink& sink = GetSink(partition_type);
     DataStreamSender sender(
-        &obj_pool_, sender_num, *row_desc_, sink, dest_, channel_buffer_size);
+        sender_num, *row_desc_, sink.stream_sink, dest_, channel_buffer_size);
+
+    TExprNode expr_node;
+    expr_node.node_type = TExprNodeType::SLOT_REF;
+    TExpr output_exprs;
+    output_exprs.nodes.push_back(expr_node);
+    EXPECT_OK(sender.Init(vector<TExpr>({output_exprs}), sink, &state));
+
     EXPECT_OK(sender.Prepare(&state, &tracker_));
     EXPECT_OK(sender.Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
@@ -517,10 +534,6 @@ class DataStreamTest : public testing::Test {
     JoinReceivers();
     CheckReceivers(stream_type, num_senders);
   }
-
- private:
-  ExprContext* lhs_slot_ctx_;
-  ExprContext* rhs_slot_ctx_;
 };
 
 TEST_F(DataStreamTest, UnknownSenderSmallResult) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/descriptors.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index bb7c1d0..ad66c56 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -27,9 +27,11 @@
 
 #include "codegen/llvm-codegen.h"
 #include "common/object-pool.h"
+#include "common/status.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "gen-cpp/Descriptors_types.h"
 #include "gen-cpp/PlanNodes_types.h"
-#include "exprs/expr.h"
 #include "runtime/runtime-state.h"
 
 #include "common/names.h"
@@ -173,25 +175,17 @@ string TableDescriptor::DebugString() const {
   return out.str();
 }
 
-HdfsPartitionDescriptor::HdfsPartitionDescriptor(const THdfsTable& 
thrift_table,
-    const THdfsPartition& thrift_partition, ObjectPool* pool)
+HdfsPartitionDescriptor::HdfsPartitionDescriptor(
+    const THdfsTable& thrift_table, const THdfsPartition& thrift_partition)
   : line_delim_(thrift_partition.lineDelim),
     field_delim_(thrift_partition.fieldDelim),
     collection_delim_(thrift_partition.collectionDelim),
     escape_char_(thrift_partition.escapeChar),
     block_size_(thrift_partition.blockSize),
     id_(thrift_partition.id),
-    file_format_(thrift_partition.fileFormat),
-    object_pool_(pool) {
+    thrift_partition_key_exprs_(thrift_partition.partitionKeyExprs),
+    file_format_(thrift_partition.fileFormat) {
   DecompressLocation(thrift_table, thrift_partition, &location_);
-  for (int i = 0; i < thrift_partition.partitionKeyExprs.size(); ++i) {
-    ExprContext* ctx;
-    // TODO: Move to dedicated Init method and treat Status return correctly
-    Status status = Expr::CreateExprTree(object_pool_,
-        thrift_partition.partitionKeyExprs[i], &ctx);
-    DCHECK(status.ok());
-    partition_key_value_ctxs_.push_back(ctx);
-  }
 }
 
 string HdfsPartitionDescriptor::DebugString() const {
@@ -210,17 +204,14 @@ string DataSourceTableDescriptor::DebugString() const {
   return out.str();
 }
 
-HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc,
-    ObjectPool* pool)
+HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, 
ObjectPool* pool)
   : TableDescriptor(tdesc),
     hdfs_base_dir_(tdesc.hdfsTable.hdfsBaseDir),
     null_partition_key_value_(tdesc.hdfsTable.nullPartitionKeyValue),
-    null_column_value_(tdesc.hdfsTable.nullColumnValue),
-    object_pool_(pool) {
+    null_column_value_(tdesc.hdfsTable.nullColumnValue) {
   for (const auto& entry : tdesc.hdfsTable.partitions) {
     HdfsPartitionDescriptor* partition =
-        new HdfsPartitionDescriptor(tdesc.hdfsTable, entry.second, pool);
-    object_pool_->Add(partition);
+        pool->Add(new HdfsPartitionDescriptor(tdesc.hdfsTable, entry.second));
     partition_descriptors_[entry.first] = partition;
   }
   avro_schema_ = tdesc.hdfsTable.__isset.avroSchema ? 
tdesc.hdfsTable.avroSchema : "";
@@ -479,8 +470,31 @@ string RowDescriptor::DebugString() const {
   return ss.str();
 }
 
+Status DescriptorTbl::CreatePartKeyExprs(
+    const HdfsTableDescriptor& hdfs_tbl, ObjectPool* pool) {
+  // Prepare and open partition exprs
+  for (const auto& part_entry : hdfs_tbl.partition_descriptors()) {
+    HdfsPartitionDescriptor* part_desc = part_entry.second;
+    vector<ScalarExpr*> partition_key_value_exprs;
+    RETURN_IF_ERROR(ScalarExpr::Create(part_desc->thrift_partition_key_exprs_,
+         RowDescriptor(), nullptr, pool, &partition_key_value_exprs));
+    for (const ScalarExpr* partition_expr : partition_key_value_exprs) {
+      DCHECK(partition_expr->IsLiteral());
+      DCHECK(!partition_expr->HasFnCtx());
+      DCHECK_EQ(partition_expr->GetNumChildren(), 0);
+    }
+    // TODO: RowDescriptor should arguably be optional in Prepare for known 
literals.
+    // Partition exprs are not used in the codegen case. Don't codegen them.
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_value_exprs, 
nullptr,
+        pool, nullptr, &part_desc->partition_key_value_evals_));
+    RETURN_IF_ERROR(ScalarExprEvaluator::Open(
+        part_desc->partition_key_value_evals_, nullptr));
+  }
+  return Status::OK();
+}
+
 Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& 
thrift_tbl,
-    MemTracker* mem_tracker, DescriptorTbl** tbl) {
+    DescriptorTbl** tbl) {
   *tbl = pool->Add(new DescriptorTbl());
   // deserialize table descriptors first, they are being referenced by tuple 
descriptors
   for (size_t i = 0; i < thrift_tbl.tableDescriptors.size(); ++i) {
@@ -489,21 +503,8 @@ Status DescriptorTbl::Create(ObjectPool* pool, const 
TDescriptorTable& thrift_tb
     switch (tdesc.tableType) {
       case TTableType::HDFS_TABLE:
         desc = pool->Add(new HdfsTableDescriptor(tdesc, pool));
-
-        if (mem_tracker != nullptr) {
-          // prepare and open partition exprs
-          const HdfsTableDescriptor* hdfs_tbl =
-              static_cast<const HdfsTableDescriptor*>(desc);
-          for (const auto& part_entry : hdfs_tbl->partition_descriptors()) {
-            // TODO: RowDescriptor should arguably be optional in Prepare for 
known
-            // literals Partition exprs are not used in the codegen case.  
Don't codegen
-            // them.
-            
RETURN_IF_ERROR(Expr::Prepare(part_entry.second->partition_key_value_ctxs(),
-                nullptr, RowDescriptor(), mem_tracker));
-            RETURN_IF_ERROR(Expr::Open(
-                part_entry.second->partition_key_value_ctxs(), nullptr));
-          }
-        }
+        RETURN_IF_ERROR(CreatePartKeyExprs(
+            *static_cast<const HdfsTableDescriptor*>(desc), pool));
         break;
       case TTableType::HBASE_TABLE:
         desc = pool->Add(new HBaseTableDescriptor(tdesc));
@@ -552,7 +553,11 @@ void DescriptorTbl::ReleaseResources() {
     const HdfsTableDescriptor* hdfs_tbl =
         static_cast<const HdfsTableDescriptor*>(entry.second);
     for (const auto& part_entry: hdfs_tbl->partition_descriptors()) {
-      Expr::Close(part_entry.second->partition_key_value_ctxs(), nullptr);
+      for (ScalarExprEvaluator* eval :
+             part_entry.second->partition_key_value_evals()) {
+        eval->Close(nullptr);
+        const_cast<ScalarExpr&>(eval->root()).Close();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/descriptors.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 06c65c5..8a8dd7b 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -24,8 +24,8 @@
 #include <boost/scoped_ptr.hpp>
 #include <ostream>
 
-#include "common/status.h"
 #include "common/global-types.h"
+#include "common/status.h"
 #include "runtime/types.h"
 
 #include "gen-cpp/Descriptors_types.h"  // for TTupleId
@@ -40,13 +40,12 @@ namespace llvm {
 
 namespace impala {
 
-class Expr;
-class ExprContext;
 class LlvmBuilder;
 class LlvmCodeGen;
 class ObjectPool;
 class RuntimeState;
-class MemTracker;
+class ScalarExpr;
+class ScalarExprEvaluator;
 class TDescriptorTable;
 class TSlotDescriptor;
 class TTable;
@@ -191,7 +190,7 @@ class SlotDescriptor {
   SlotDescriptor(const TSlotDescriptor& tdesc, const TupleDescriptor* parent,
       const TupleDescriptor* collection_item_descriptor);
 
-  /// Generate LLVM code at the insert position of 'builder' to get the i8 
value of the
+  /// Generate LLVM code at the insert position of 'builder' to get the i8 
value of
   /// the byte containing 'null_indicator_offset' in 'tuple'. If 
'null_byte_ptr' is
   /// non-NULL, sets that to a pointer to the null byte.
   static llvm::Value* CodegenGetNullByte(LlvmCodeGen* codegen, LlvmBuilder* 
builder,
@@ -249,7 +248,7 @@ class TableDescriptor {
 class HdfsPartitionDescriptor {
  public:
   HdfsPartitionDescriptor(const THdfsTable& thrift_table,
-      const THdfsPartition& thrift_partition, ObjectPool* pool);
+      const THdfsPartition& thrift_partition);
 
   char line_delim() const { return line_delim_; }
   char field_delim() const { return field_delim_; }
@@ -261,14 +260,16 @@ class HdfsPartitionDescriptor {
   int64_t id() const { return id_; }
   std::string DebugString() const;
 
-  /// It is safe to evaluate the returned expr contexts concurrently from 
multiple
+  /// It is safe to call the returned expr evaluators concurrently from 
multiple
   /// threads because all exprs are literals, after the descriptor table has 
been
   /// opened.
-  const std::vector<ExprContext*>& partition_key_value_ctxs() const {
-    return partition_key_value_ctxs_;
+  const std::vector<ScalarExprEvaluator*>& partition_key_value_evals() const {
+    return partition_key_value_evals_;
   }
 
  private:
+  friend class DescriptorTbl;
+
   char line_delim_;
   char field_delim_;
   char collection_delim_;
@@ -284,14 +285,15 @@ class HdfsPartitionDescriptor {
   /// The Prepare()/Open()/Close() cycle is controlled by the containing 
descriptor table
   /// because the same partition descriptor may be used by multiple exec nodes 
with
   /// different lifetimes.
-  std::vector<ExprContext*> partition_key_value_ctxs_;
+  const std::vector<TExpr>& thrift_partition_key_exprs_;
+
+  /// These evaluators are safe to be shared by all fragment instances as all 
expressions
+  /// are Literals.
+  /// TODO: replace this with vector<Literal> instead.
+  std::vector<ScalarExprEvaluator*> partition_key_value_evals_;
 
   /// The format (e.g. text, sequence file etc.) of data in the files in this 
partition
   THdfsFileFormat::type file_format_;
-
-  /// For allocating expression objects in partition_key_values_
-  /// Owned by DescriptorTbl, supplied in constructor.
-  ObjectPool* object_pool_;
 };
 
 class HdfsTableDescriptor : public TableDescriptor {
@@ -327,8 +329,6 @@ class HdfsTableDescriptor : public TableDescriptor {
   PartitionIdToDescriptorMap partition_descriptors_;
   /// Set to the table's Avro schema if this is an Avro table, empty string 
otherwise
   std::string avro_schema_;
-  /// Owned by DescriptorTbl
-  ObjectPool* object_pool_;
 };
 
 class HBaseTableDescriptor : public TableDescriptor {
@@ -461,13 +461,9 @@ class TupleDescriptor {
 class DescriptorTbl {
  public:
   /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it 
via 'tbl'.
-  /// If mem_tracker_ != nullptr, also opens partition exprs for hdfs tables 
(and does
-  /// memory allocation against that tracker).
   /// Returns OK on success, otherwise error (in which case 'tbl' will be 
unset).
-  /// TODO: when cleaning up ExprCtx, remove the need to pass in a memtracker 
for literal
-  /// exprs that don't require additional memory at runtime.
   static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl,
-      MemTracker* mem_tracker, DescriptorTbl** tbl);
+      DescriptorTbl** tbl) WARN_UNUSED_RESULT;
 
   /// Free memory allocated in Create().
   void ReleaseResources();
@@ -491,6 +487,9 @@ class DescriptorTbl {
   SlotDescriptorMap slot_desc_map_;
 
   DescriptorTbl(): tbl_desc_map_(), tuple_desc_map_(), slot_desc_map_() {}
+
+  static Status CreatePartKeyExprs(
+      const HdfsTableDescriptor& hdfs_tbl, ObjectPool* pool) 
WARN_UNUSED_RESULT;
 };
 
 /// Records positions of tuples within row produced by ExecNode.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 5f109f5..2385eab 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -185,9 +185,8 @@ Status FragmentInstanceState::Prepare() {
 
   // prepare sink_
   DCHECK(fragment_ctx_.fragment.__isset.output_sink);
-  RETURN_IF_ERROR(
-      DataSink::Create(
-          obj_pool(), fragment_ctx_, instance_ctx_, exec_tree_->row_desc(), 
&sink_));
+  RETURN_IF_ERROR(DataSink::Create(fragment_ctx_, instance_ctx_, 
exec_tree_->row_desc(),
+      runtime_state_, &sink_));
   RETURN_IF_ERROR(sink_->Prepare(runtime_state_, 
runtime_state_->instance_mem_tracker()));
   RuntimeProfile* sink_profile = sink_->profile();
   if (sink_profile != nullptr) profile()->AddChild(sink_profile);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 90f0185..0ee2c12 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -266,8 +266,7 @@ void QueryState::StartFInstances() {
 
   // set up desc tbl
   DCHECK(query_ctx().__isset.desc_tbl);
-  Status status = DescriptorTbl::Create(
-      &obj_pool_, query_ctx().desc_tbl, query_mem_tracker_, &desc_tbl_);
+  Status status = DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl, 
&desc_tbl_);
   if (!status.ok()) {
     instances_prepared_promise_.Set(status);
     ReportExecStatusAux(true, status, nullptr, false);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index b953982..ab70d4a 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -90,8 +90,8 @@ class RuntimeFilter {
   /// compact way of representing a full Bloom filter that contains every 
element.
   BloomFilter* bloom_filter_;
 
-  /// Descriptor of the filter.
-  TRuntimeFilterDesc filter_desc_;
+  /// Reference to the filter's thrift descriptor in the thrift Plan tree.
+  const TRuntimeFilterDesc& filter_desc_;
 
   /// Time, in ms, that the filter was registered.
   int64_t registration_time_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index eb11072..1224345 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -29,7 +29,7 @@
 #include "codegen/llvm-codegen.h"
 #include "common/object-pool.h"
 #include "common/status.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "exprs/scalar-fn-call.h"
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/bufferpool/reservation-tracker.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc 
b/be/src/runtime/sorted-run-merger.cc
index 8f8891a..36d7a61 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -12,7 +12,7 @@
 // limitations under the License.
 
 #include "runtime/sorted-run-merger.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
 #include "runtime/row-batch.h"
 #include "runtime/sorter.h"

Reply via email to