http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/scalar-fn-call.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc index 59aac6d..73b497d 100644 --- a/be/src/exprs/scalar-fn-call.cc +++ b/be/src/exprs/scalar-fn-call.cc @@ -30,7 +30,7 @@ #include "codegen/codegen-anyval.h" #include "codegen/llvm-codegen.h" #include "exprs/anyval-util.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/lib-cache.h" #include "runtime/runtime-state.h" @@ -57,7 +57,7 @@ using std::pair; #define MAX_INTERP_ARGS 20 ScalarFnCall::ScalarFnCall(const TExprNode& node) - : Expr(node), + : ScalarExpr(node), vararg_start_idx_(node.__isset.vararg_start_idx ? node.vararg_start_idx : -1), scalar_fn_wrapper_(NULL), prepare_fn_(NULL), @@ -78,9 +78,9 @@ Status ScalarFnCall::LoadPrepareAndCloseFn(LlvmCodeGen* codegen) { return Status::OK(); } -Status ScalarFnCall::Prepare(RuntimeState* state, const RowDescriptor& desc, - ExprContext* context) { - RETURN_IF_ERROR(Expr::Prepare(state, desc, context)); +Status ScalarFnCall::Init(const RowDescriptor& desc, RuntimeState* state) { + // Initialize children first. + RETURN_IF_ERROR(ScalarExpr::Init(desc, state)); if (fn_.scalar_fn.symbol.empty()) { // This path is intended to only be used during development to test FE @@ -94,17 +94,11 @@ Status ScalarFnCall::Prepare(RuntimeState* state, const RowDescriptor& desc, } // Check if the function takes CHAR as input or returns CHAR. - FunctionContext::TypeDesc return_type = AnyValUtil::ColumnTypeToTypeDesc(type_); - vector<FunctionContext::TypeDesc> arg_types; bool has_char_arg_or_result = type_.type == TYPE_CHAR; - for (int i = 0; i < children_.size(); ++i) { - arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(children_[i]->type_)); + for (int i = 0; !has_char_arg_or_result && i < children_.size(); ++i) { has_char_arg_or_result |= children_[i]->type_.type == TYPE_CHAR; } - fn_context_index_ = - context->Register(state, return_type, arg_types, ComputeVarArgsBufferSize()); - // Use the interpreted path and call the builtin without codegen if any of the // followings is true: // 1. codegen is disabled by query option @@ -155,50 +149,63 @@ Status ScalarFnCall::Prepare(RuntimeState* state, const RowDescriptor& desc, state->AddScalarFnToCodegen(this); } - // For IR UDF, the loading of the Prepare() and Close() functions is deferred until + // For IR UDF, the loading of the Init() and CloseContext() functions is deferred until // first time GetCodegendComputeFn() is invoked. if (!is_ir_udf) RETURN_IF_ERROR(LoadPrepareAndCloseFn(NULL)); return Status::OK(); } -Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx, - FunctionContext::FunctionStateScope scope) { +Status ScalarFnCall::OpenEvaluator(FunctionContext::FunctionStateScope scope, + RuntimeState* state, ScalarExprEvaluator* eval) const { // Opens and inits children - RETURN_IF_ERROR(Expr::Open(state, ctx, scope)); - FunctionContext* fn_ctx = ctx->fn_context(fn_context_index_); + RETURN_IF_ERROR(ScalarExpr::OpenEvaluator(scope, state, eval)); + DCHECK_GE(fn_ctx_idx_, 0); + FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_); + bool is_interpreted = scalar_fn_wrapper_ == nullptr; - if (scalar_fn_wrapper_ == NULL) { + if (is_interpreted) { // We're in the interpreted path (i.e. no JIT). Populate our FunctionContext's // staging_input_vals, which will be reused across calls to scalar_fn_. - DCHECK(scalar_fn_ != NULL); + DCHECK(scalar_fn_ != nullptr); vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals(); for (int i = 0; i < NumFixedArgs(); ++i) { AnyVal* input_val; - RETURN_IF_ERROR(AllocateAnyVal(state, ctx->pool_.get(), children_[i]->type(), + RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), children_[i]->type(), "Could not allocate expression value", &input_val)); input_vals->push_back(input_val); } } // Only evaluate constant arguments at the top level of function contexts. - // If 'ctx' was cloned, the constant values were copied from the parent. + // If 'eval' was cloned, the constant values were copied from the parent. if (scope == FunctionContext::FRAGMENT_LOCAL) { vector<AnyVal*> constant_args; - for (int i = 0; i < children_.size(); ++i) { + for (const ScalarExpr* child : children()) { AnyVal* const_val; - RETURN_IF_ERROR(children_[i]->GetConstVal(state, ctx, &const_val)); + RETURN_IF_ERROR(eval->GetConstValue(state, *child, &const_val)); constant_args.push_back(const_val); } fn_ctx->impl()->SetConstantArgs(move(constant_args)); + + // If we're calling MathFunctions::RoundUpTo(), we need to set output_scale_ + // which determines how many decimal places are printed. + // TODO: Move this to Expr initialization. + if (this == &eval->root()) { + if (fn_.name.function_name == "round" && type_.type == TYPE_DOUBLE) { + DCHECK_EQ(children_.size(), 2); + IntVal* scale_arg = reinterpret_cast<IntVal*>(constant_args[1]); + if (scale_arg != nullptr) eval->output_scale_ = scale_arg->val; + } + } } - if (scalar_fn_wrapper_ == NULL) { + if (is_interpreted) { // Now we have the constant values, cache them so that the interpreted path can // call the UDF without reevaluating the arguments. 'staging_input_vals' and // 'varargs_buffer' in the FunctionContext are used to pass fixed and variable-length // arguments respectively. 'non_constant_args()' in the FunctionContext will contain // pointers to the remaining (non-constant) children that are evaluated for every row. - vector<pair<Expr*, AnyVal*>> non_constant_args; + vector<pair<ScalarExpr*, AnyVal*>> non_constant_args; uint8_t* varargs_buffer = fn_ctx->impl()->varargs_buffer(); for (int i = 0; i < children_.size(); ++i) { AnyVal* input_arg; @@ -222,7 +229,7 @@ Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx, // be able to trust is_constant() and switch back to that. if (children_[i]->IsLiteral()) { const AnyVal* constant_arg = fn_ctx->impl()->constant_args()[i]; - DCHECK(constant_arg != NULL); + DCHECK(constant_arg != nullptr); memcpy(input_arg, constant_arg, arg_bytes); } else { non_constant_args.emplace_back(children_[i], input_arg); @@ -231,7 +238,7 @@ Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx, fn_ctx->impl()->SetNonConstantArgs(move(non_constant_args)); } - if (prepare_fn_ != NULL) { + if (prepare_fn_ != nullptr) { if (scope == FunctionContext::FRAGMENT_LOCAL) { prepare_fn_(fn_ctx, FunctionContext::FRAGMENT_LOCAL); if (fn_ctx->has_error()) return Status(fn_ctx->error_msg()); @@ -240,31 +247,20 @@ Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx, if (fn_ctx->has_error()) return Status(fn_ctx->error_msg()); } - // If we're calling MathFunctions::RoundUpTo(), we need to set output_scale_, which - // determines how many decimal places are printed. - // TODO: revisit this. We should be able to do this if the scale argument is - // non-constant. - if (fn_.name.function_name == "round" && type_.type == TYPE_DOUBLE) { - DCHECK_EQ(children_.size(), 2); - if (children_[1]->is_constant()) { - IntVal scale_arg = children_[1]->GetIntVal(ctx, NULL); - output_scale_ = scale_arg.val; - } - } - return Status::OK(); } -void ScalarFnCall::Close(RuntimeState* state, ExprContext* context, - FunctionContext::FunctionStateScope scope) { - if (fn_context_index_ != -1 && close_fn_ != NULL) { - FunctionContext* fn_ctx = context->fn_context(fn_context_index_); +void ScalarFnCall::CloseEvaluator(FunctionContext::FunctionStateScope scope, + RuntimeState* state, ScalarExprEvaluator* eval) const { + DCHECK_GE(fn_ctx_idx_, 0); + if (close_fn_ != NULL) { + FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_); close_fn_(fn_ctx, FunctionContext::THREAD_LOCAL); if (scope == FunctionContext::FRAGMENT_LOCAL) { close_fn_(fn_ctx, FunctionContext::FRAGMENT_LOCAL); } } - Expr::Close(state, context, scope); + ScalarExpr::CloseEvaluator(scope, state, eval); } // Dynamically loads the pre-compiled UDF and codegens a function that calls each child's @@ -336,8 +332,8 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* codegen, Function** fn) { fn_name << udf->getName().str() << "Wrapper"; Value* args[2]; - *fn = CreateIrFunctionPrototype(codegen, fn_name.str(), &args); - Value* expr_ctx = args[0]; + *fn = CreateIrFunctionPrototype(fn_name.str(), codegen, &args); + Value* eval = args[0]; Value* row = args[1]; BasicBlock* block = BasicBlock::Create(codegen->context(), "entry", *fn); LlvmBuilder builder(block); @@ -346,12 +342,11 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* codegen, Function** fn) { vector<Value*> udf_args; // First argument is always FunctionContext*. - // Index into our registered offset in the ExprContext. - Value* expr_ctx_gep = builder.CreateStructGEP(NULL, expr_ctx, 1, "expr_ctx_gep"); - Value* fn_ctxs_base = builder.CreateLoad(expr_ctx_gep, "fn_ctxs_base"); + // Index into our registered offset in the ScalarFnEvaluator. + Value* eval_gep = builder.CreateStructGEP(NULL, eval, 1, "eval_gep"); + Value* fn_ctxs_base = builder.CreateLoad(eval_gep, "fn_ctxs_base"); // Use GEP to add our index to the base pointer - Value* fn_ctx_ptr = - builder.CreateConstGEP1_32(fn_ctxs_base, fn_context_index_, "fn_ctx_ptr"); + Value* fn_ctx_ptr = builder.CreateConstGEP1_32(fn_ctxs_base, fn_ctx_idx_, "fn_ctx_ptr"); Value* fn_ctx = builder.CreateLoad(fn_ctx_ptr, "fn_ctx"); udf_args.push_back(fn_ctx); @@ -371,16 +366,17 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* codegen, Function** fn) { for (int i = 0; i < GetNumChildren(); ++i) { Function* child_fn = NULL; vector<Value*> child_fn_args; - // Set 'child_fn' to the codegen'd function, sets child_fn = NULL if codegen fails - children_[i]->GetCodegendComputeFn(codegen, &child_fn); - if (child_fn == NULL) { + // Set 'child_fn' to the codegen'd function, sets child_fn == NULL if codegen fails + Status status = children_[i]->GetCodegendComputeFn(codegen, &child_fn); + if (UNLIKELY(!status.ok())) { + DCHECK(child_fn == NULL); // Set 'child_fn' to the interpreted function child_fn = GetStaticGetValWrapper(children_[i]->type(), codegen); // First argument to interpreted function is children_[i] - Type* expr_ptr_type = codegen->GetPtrType(Expr::LLVM_CLASS_NAME); + Type* expr_ptr_type = codegen->GetPtrType(ScalarExpr::LLVM_CLASS_NAME); child_fn_args.push_back(codegen->CastPtrToLlvmPtr(expr_ptr_type, children_[i])); } - child_fn_args.push_back(expr_ctx); + child_fn_args.push_back(eval); child_fn_args.push_back(row); // Call 'child_fn', adding the result to either 'udf_args' or 'varargs_buffer' @@ -453,20 +449,21 @@ Status ScalarFnCall::GetFunction(LlvmCodeGen* codegen, const string& symbol, voi } void ScalarFnCall::EvaluateNonConstantChildren( - ExprContext* context, const TupleRow* row) { - FunctionContext* fn_ctx = context->fn_context(fn_context_index_); - for (pair<Expr*, AnyVal*> child : fn_ctx->impl()->non_constant_args()) { - void* val = context->GetValue(child.first, row); + ScalarExprEvaluator* eval, const TupleRow* row) const { + FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_); + for (pair<ScalarExpr*, AnyVal*> child : fn_ctx->impl()->non_constant_args()) { + void* val = eval->GetValue(*(child.first), row); AnyValUtil::SetAnyVal(val, child.first->type(), child.second); } } template<typename RETURN_TYPE> -RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* context, const TupleRow* row) { +RETURN_TYPE ScalarFnCall::InterpretEval(ScalarExprEvaluator* eval, + const TupleRow* row) const { DCHECK(scalar_fn_ != NULL) << DebugString(); - FunctionContext* fn_ctx = context->fn_context(fn_context_index_); + FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_); vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals(); - EvaluateNonConstantChildren(context, row); + EvaluateNonConstantChildren(eval, row); if (vararg_start_idx_ == -1) { switch (children_.size()) { @@ -480,7 +477,7 @@ RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* context, const TupleRow* ro // case X: // typedef RETURN_TYPE (*ScalarFnX)(FunctionContext*, const AnyVal& a1, ..., // const AnyVal& aX); - // return reinterpret_cast<ScalarFnn>(scalar_fn_)(fn_ctx, *(*input_vals)[0], ..., + // return reinterpret_cast<ScalarFnX>(scalar_fn_)(fn_ctx, *(*input_vals)[0], ..., // *(*input_vals)[X-1]); #define SCALAR_FN_TYPE(n) BOOST_PP_CAT(ScalarFn, n) #define INTERP_SCALAR_FN(z, n, unused) \ @@ -524,102 +521,112 @@ RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* context, const TupleRow* ro return RETURN_TYPE::null(); } -typedef BooleanVal (*BooleanWrapper)(ExprContext*, const TupleRow*); -typedef TinyIntVal (*TinyIntWrapper)(ExprContext*, const TupleRow*); -typedef SmallIntVal (*SmallIntWrapper)(ExprContext*, const TupleRow*); -typedef IntVal (*IntWrapper)(ExprContext*, const TupleRow*); -typedef BigIntVal (*BigIntWrapper)(ExprContext*, const TupleRow*); -typedef FloatVal (*FloatWrapper)(ExprContext*, const TupleRow*); -typedef DoubleVal (*DoubleWrapper)(ExprContext*, const TupleRow*); -typedef StringVal (*StringWrapper)(ExprContext*, const TupleRow*); -typedef TimestampVal (*TimestampWrapper)(ExprContext*, const TupleRow*); -typedef DecimalVal (*DecimalWrapper)(ExprContext*, const TupleRow*); +typedef BooleanVal (*BooleanWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef TinyIntVal (*TinyIntWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef SmallIntVal (*SmallIntWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef IntVal (*IntWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef BigIntVal (*BigIntWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef FloatVal (*FloatWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef DoubleVal (*DoubleWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef StringVal (*StringWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef TimestampVal (*TimestampWrapper)(ScalarExprEvaluator*, const TupleRow*); +typedef DecimalVal (*DecimalWrapper)(ScalarExprEvaluator*, const TupleRow*); // TODO: macroify this? -BooleanVal ScalarFnCall::GetBooleanVal(ExprContext* context, const TupleRow* row) { +BooleanVal ScalarFnCall::GetBooleanVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_BOOLEAN); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<BooleanVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<BooleanVal>(eval, row); BooleanWrapper fn = reinterpret_cast<BooleanWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -TinyIntVal ScalarFnCall::GetTinyIntVal(ExprContext* context, const TupleRow* row) { +TinyIntVal ScalarFnCall::GetTinyIntVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_TINYINT); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<TinyIntVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<TinyIntVal>(eval, row); TinyIntWrapper fn = reinterpret_cast<TinyIntWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -SmallIntVal ScalarFnCall::GetSmallIntVal(ExprContext* context, const TupleRow* row) { +SmallIntVal ScalarFnCall::GetSmallIntVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_SMALLINT); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<SmallIntVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<SmallIntVal>(eval, row); SmallIntWrapper fn = reinterpret_cast<SmallIntWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -IntVal ScalarFnCall::GetIntVal(ExprContext* context, const TupleRow* row) { +IntVal ScalarFnCall::GetIntVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_INT); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<IntVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<IntVal>(eval, row); IntWrapper fn = reinterpret_cast<IntWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -BigIntVal ScalarFnCall::GetBigIntVal(ExprContext* context, const TupleRow* row) { +BigIntVal ScalarFnCall::GetBigIntVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_BIGINT); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<BigIntVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<BigIntVal>(eval, row); BigIntWrapper fn = reinterpret_cast<BigIntWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -FloatVal ScalarFnCall::GetFloatVal(ExprContext* context, const TupleRow* row) { +FloatVal ScalarFnCall::GetFloatVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_FLOAT); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<FloatVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<FloatVal>(eval, row); FloatWrapper fn = reinterpret_cast<FloatWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -DoubleVal ScalarFnCall::GetDoubleVal(ExprContext* context, const TupleRow* row) { +DoubleVal ScalarFnCall::GetDoubleVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_DOUBLE); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<DoubleVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<DoubleVal>(eval, row); DoubleWrapper fn = reinterpret_cast<DoubleWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -StringVal ScalarFnCall::GetStringVal(ExprContext* context, const TupleRow* row) { +StringVal ScalarFnCall::GetStringVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK(type_.IsStringType()); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<StringVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<StringVal>(eval, row); StringWrapper fn = reinterpret_cast<StringWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -TimestampVal ScalarFnCall::GetTimestampVal(ExprContext* context, const TupleRow* row) { +TimestampVal ScalarFnCall::GetTimestampVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_TIMESTAMP); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<TimestampVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<TimestampVal>(eval, row); TimestampWrapper fn = reinterpret_cast<TimestampWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } -DecimalVal ScalarFnCall::GetDecimalVal(ExprContext* context, const TupleRow* row) { +DecimalVal ScalarFnCall::GetDecimalVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_DECIMAL); - DCHECK(context != NULL); - if (scalar_fn_wrapper_ == NULL) return InterpretEval<DecimalVal>(context, row); + DCHECK(eval != NULL); + if (scalar_fn_wrapper_ == NULL) return InterpretEval<DecimalVal>(eval, row); DecimalWrapper fn = reinterpret_cast<DecimalWrapper>(scalar_fn_wrapper_); - return fn(context, row); + return fn(eval, row); } string ScalarFnCall::DebugString() const { stringstream out; out << "ScalarFnCall(udf_type=" << fn_.binary_type << " location=" << fn_.hdfs_location - << " symbol_name=" << fn_.scalar_fn.symbol << Expr::DebugString() << ")"; + << " symbol_name=" << fn_.scalar_fn.symbol << ScalarExpr::DebugString() << ")"; return out.str(); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/scalar-fn-call.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/scalar-fn-call.h b/be/src/exprs/scalar-fn-call.h index ffb9a2f..563f91d 100644 --- a/be/src/exprs/scalar-fn-call.h +++ b/be/src/exprs/scalar-fn-call.h @@ -21,15 +21,28 @@ #include <string> -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "udf/udf.h" -using namespace impala_udf; - namespace impala { +using impala_udf::FunctionContext; +using impala_udf::AnyVal; +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::TimestampVal; +using impala_udf::StringVal; +using impala_udf::DecimalVal; + +class ScalarExprEvaluator; class TExprNode; +/// /// Expr for evaluating a pre-compiled native or LLVM IR function that uses the UDF /// interface (i.e. a scalar function). This class overrides GetCodegendComputeFn() to /// return a function that calls any child exprs and passes the results as arguments to the @@ -41,46 +54,58 @@ class TExprNode; /// every possible function signature, codegen may be required to generate the call to the /// function even if codegen is disabled. Codegen will also be used for IR UDFs (note that /// there is no way to specify both a native and IR library for a single UDF). -// +/// +/// Scalar function call: An expr that returns a single scalar value and can be +/// implemented using the UDF interface. Note that this includes builtins, which although +/// not being user-defined still use the same interface as UDFs (i.e., they are +/// implemented as functions with signature "*Val (FunctionContext*, *Val, *Val...)"). +/// /// TODO: /// - Fix error reporting, e.g. reporting leaks /// - Testing /// - Test cancellation /// - Type descs in UDA test harness /// - Allow more functions to be NULL in UDA test harness -class ScalarFnCall : public Expr { +class ScalarFnCall : public ScalarExpr { public: - virtual std::string DebugString() const; + virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn) + override WARN_UNUSED_RESULT; + virtual std::string DebugString() const override; protected: - friend class Expr; - friend class RuntimeState; + friend class ScalarExpr; + friend class ScalarExprEvaluator; + + virtual bool HasFnCtx() const override { return true; } ScalarFnCall(const TExprNode& node); - virtual Status Prepare(RuntimeState* state, const RowDescriptor& desc, - ExprContext* context); - virtual Status Open(RuntimeState* state, ExprContext* context, - FunctionContext::FunctionStateScope scope = FunctionContext::FRAGMENT_LOCAL); - virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn); - virtual void Close(RuntimeState* state, ExprContext* context, - FunctionContext::FunctionStateScope scope = FunctionContext::FRAGMENT_LOCAL); - - virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow*); - virtual TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow*); - virtual SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow*); - virtual IntVal GetIntVal(ExprContext* context, const TupleRow*); - virtual BigIntVal GetBigIntVal(ExprContext* context, const TupleRow*); - virtual FloatVal GetFloatVal(ExprContext* context, const TupleRow*); - virtual DoubleVal GetDoubleVal(ExprContext* context, const TupleRow*); - virtual StringVal GetStringVal(ExprContext* context, const TupleRow*); - virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow*); - virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow*); + virtual Status Init(const RowDescriptor& row_desc, RuntimeState* state) + override WARN_UNUSED_RESULT; + virtual Status OpenEvaluator(FunctionContext::FunctionStateScope scope, + RuntimeState* state, ScalarExprEvaluator* eval) const override + WARN_UNUSED_RESULT; + virtual void CloseEvaluator(FunctionContext::FunctionStateScope scope, + RuntimeState* state, ScalarExprEvaluator* eval) const override; + virtual int ComputeVarArgsBufferSize() const override; + + virtual BooleanVal GetBooleanVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual TinyIntVal GetTinyIntVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual SmallIntVal GetSmallIntVal( + ScalarExprEvaluator*, const TupleRow*) const override; + virtual IntVal GetIntVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual BigIntVal GetBigIntVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual FloatVal GetFloatVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual DoubleVal GetDoubleVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual StringVal GetStringVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual TimestampVal GetTimestampVal( + ScalarExprEvaluator*, const TupleRow*) const override; + virtual DecimalVal GetDecimalVal(ScalarExprEvaluator*, const TupleRow*) const override; private: /// If this function has var args, children()[vararg_start_idx_] is the first vararg /// argument. /// If this function does not have varargs, it is set to -1. - int vararg_start_idx_; + const int vararg_start_idx_; /// Vector of all non-constant children expressions that need to be evaluated for /// each input row. The first element of each pair is the child expression and the @@ -88,18 +113,18 @@ class ScalarFnCall : public Expr { std::vector<std::pair<Expr*, impala_udf::AnyVal*>> non_constant_children_; /// Function pointer to the JIT'd function produced by GetCodegendComputeFn(). - /// Has signature *Val (ExprContext*, const TupleRow*), and calls the scalar + /// Has signature *Val (ScalarExprEvaluator*, const TupleRow*), and calls the scalar /// function with signature like *Val (FunctionContext*, const *Val& arg1, ...) void* scalar_fn_wrapper_; /// The UDF's prepare function, if specified. This is initialized in Prepare() and /// called in Open() (since we may have needed to codegen the function if it's from an /// IR module). - UdfPrepare prepare_fn_; + impala_udf::UdfPrepare prepare_fn_; /// THe UDF's close function, if specified. This is initialized in Prepare() and called /// in Close(). - UdfClose close_fn_; + impala_udf::UdfClose close_fn_; /// If running with codegen disabled, scalar_fn_ will be a pointer to the non-JIT'd /// scalar function. @@ -110,7 +135,7 @@ class ScalarFnCall : public Expr { return vararg_start_idx_ >= 0 ? vararg_start_idx_ : children_.size(); } - int NumVarArgs() const { return children_.size() - NumFixedArgs(); } + virtual int NumVarArgs() const { return children_.size() - NumFixedArgs(); } const ColumnType& VarArgsType() const { DCHECK_GE(NumVarArgs(), 1); @@ -120,22 +145,21 @@ class ScalarFnCall : public Expr { /// Loads the native or IR function 'symbol' from HDFS and puts the result in *fn. /// If the function is loaded from an IR module, it cannot be called until the module /// has been JIT'd (i.e. after GetCodegendComputeFn() has been called). - Status GetFunction(LlvmCodeGen* codegen, const std::string& symbol, void** fn); + Status GetFunction(LlvmCodeGen* codegen, const std::string& symbol, void** fn) + WARN_UNUSED_RESULT; /// Loads the Prepare() and Close() functions for this ScalarFnCall. They could be /// native or IR functions. To load IR functions, the codegen object must have /// been created and any external LLVM module must have been linked already. - Status LoadPrepareAndCloseFn(LlvmCodeGen* codegen); + Status LoadPrepareAndCloseFn(LlvmCodeGen* codegen) WARN_UNUSED_RESULT; /// Evaluates the non-constant children exprs. Used in the interpreted path. - void EvaluateNonConstantChildren(ExprContext* context, const TupleRow* row); + void EvaluateNonConstantChildren( + ScalarExprEvaluator* eval, const TupleRow* row) const; /// Function to call scalar_fn_. Used in the interpreted path. template <typename RETURN_TYPE> - RETURN_TYPE InterpretEval(ExprContext* context, const TupleRow* row); - - /// Computes the size of the varargs buffer in bytes (0 bytes if no varargs). - int ComputeVarArgsBufferSize() const; + RETURN_TYPE InterpretEval(ScalarExprEvaluator* eval, const TupleRow* row) const; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/slot-ref.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc index 3465ff4..49931c3 100644 --- a/be/src/exprs/slot-ref.cc +++ b/be/src/exprs/slot-ref.cc @@ -22,6 +22,7 @@ #include "codegen/codegen-anyval.h" #include "codegen/llvm-codegen.h" +#include "exprs/scalar-expr-evaluator.h" #include "gen-cpp/Exprs_types.h" #include "runtime/collection-value.h" #include "runtime/decimal-value.h" @@ -39,7 +40,7 @@ using namespace llvm; namespace impala { SlotRef::SlotRef(const TExprNode& node) - : Expr(node, true), + : ScalarExpr(node), slot_offset_(-1), // invalid null_indicator_offset_(0, 0), slot_id_(node.slot_ref.slot_id) { @@ -47,7 +48,7 @@ SlotRef::SlotRef(const TExprNode& node) } SlotRef::SlotRef(const SlotDescriptor* desc) - : Expr(desc->type(), false, true), + : ScalarExpr(desc->type(), false), slot_offset_(-1), null_indicator_offset_(0, 0), slot_id_(desc->id()) { @@ -55,48 +56,48 @@ SlotRef::SlotRef(const SlotDescriptor* desc) } SlotRef::SlotRef(const SlotDescriptor* desc, const ColumnType& type) - : Expr(type, false, true), + : ScalarExpr(type, false), slot_offset_(-1), null_indicator_offset_(0, 0), slot_id_(desc->id()) { // slot_/null_indicator_offset_ are set in Prepare() } - SlotRef::SlotRef(const ColumnType& type, int offset, const bool nullable /* = false */) - : Expr(type, false, true), +SlotRef::SlotRef(const ColumnType& type, int offset, const bool nullable /* = false */) + : ScalarExpr(type, false), tuple_idx_(0), slot_offset_(offset), null_indicator_offset_(0, nullable ? offset : -1), slot_id_(-1) { } -Status SlotRef::Prepare(RuntimeState* state, const RowDescriptor& row_desc, - ExprContext* context) { +Status SlotRef::Init(const RowDescriptor& row_desc, RuntimeState* state) { DCHECK_EQ(children_.size(), 0); - if (slot_id_ == -1) return Status::OK(); - - const SlotDescriptor* slot_desc = state->desc_tbl().GetSlotDescriptor(slot_id_); - if (slot_desc == NULL) { - // TODO: create macro MAKE_ERROR() that returns a stream - stringstream error; - error << "couldn't resolve slot descriptor " << slot_id_; - LOG(INFO) << error.str(); - return Status(error.str()); - } - tuple_idx_ = row_desc.GetTupleIdx(slot_desc->parent()->id()); - if (tuple_idx_ == RowDescriptor::INVALID_IDX) { - TupleDescriptor* d = state->desc_tbl().GetTupleDescriptor(slot_desc->parent()->id()); - LOG(INFO) << "invalid idx: " << slot_desc->DebugString() - << "\nparent=" << d->DebugString() - << "\nrow=" << row_desc.DebugString(); - stringstream error; - error << "invalid tuple_idx"; - return Status(error.str()); + if (slot_id_ != -1) { + const SlotDescriptor* slot_desc = state->desc_tbl().GetSlotDescriptor(slot_id_); + if (slot_desc == NULL) { + // TODO: create macro MAKE_ERROR() that returns a stream + stringstream error; + error << "couldn't resolve slot descriptor " << slot_id_; + LOG(INFO) << error.str(); + return Status(error.str()); + } + tuple_idx_ = row_desc.GetTupleIdx(slot_desc->parent()->id()); + if (tuple_idx_ == RowDescriptor::INVALID_IDX) { + TupleDescriptor* d = + state->desc_tbl().GetTupleDescriptor(slot_desc->parent()->id()); + LOG(INFO) << "invalid idx: " << slot_desc->DebugString() + << "\nparent=" << d->DebugString() + << "\nrow=" << row_desc.DebugString(); + stringstream error; + error << "invalid tuple_idx"; + return Status(error.str()); + } + DCHECK(tuple_idx_ != RowDescriptor::INVALID_IDX); + tuple_is_nullable_ = row_desc.TupleIsNullable(tuple_idx_); + slot_offset_ = slot_desc->tuple_offset(); + null_indicator_offset_ = slot_desc->null_indicator_offset(); } - DCHECK(tuple_idx_ != RowDescriptor::INVALID_IDX); - tuple_is_nullable_ = row_desc.TupleIsNullable(tuple_idx_); - slot_offset_ = slot_desc->tuple_offset(); - null_indicator_offset_ = slot_desc->null_indicator_offset(); return Status::OK(); } @@ -111,7 +112,7 @@ string SlotRef::DebugString() const { << " tuple_idx=" << tuple_idx_ << " slot_offset=" << slot_offset_ << " tuple_is_nullable=" << tuple_is_nullable_ << " null_indicator=" << null_indicator_offset_ - << Expr::DebugString() << ")"; + << ScalarExpr::DebugString() << ")"; return out.str(); } @@ -185,7 +186,7 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn) LLVMContext& context = codegen->context(); Value* args[2]; - *fn = CreateIrFunctionPrototype(codegen, "GetSlotRef", &args); + *fn = CreateIrFunctionPrototype("GetSlotRef", codegen, &args); Value* row_ptr = args[1]; Value* tuple_offset = ConstantInt::get(codegen->int_type(), tuple_idx_); @@ -358,61 +359,70 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn) } *fn = codegen->FinalizeFunction(*fn); - codegen->RegisterExprFn(unique_slot_id, *fn); + if (UNLIKELY(*fn == NULL)) return Status(TErrorCode::IR_VERIFY_FAILED, "SlotRef"); ir_compute_fn_ = *fn; + codegen->RegisterExprFn(unique_slot_id, *fn); return Status::OK(); } -BooleanVal SlotRef::GetBooleanVal(ExprContext* context, const TupleRow* row) { +BooleanVal SlotRef::GetBooleanVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_BOOLEAN); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return BooleanVal::null(); return BooleanVal(*reinterpret_cast<bool*>(t->GetSlot(slot_offset_))); } -TinyIntVal SlotRef::GetTinyIntVal(ExprContext* context, const TupleRow* row) { +TinyIntVal SlotRef::GetTinyIntVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_TINYINT); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return TinyIntVal::null(); return TinyIntVal(*reinterpret_cast<int8_t*>(t->GetSlot(slot_offset_))); } -SmallIntVal SlotRef::GetSmallIntVal(ExprContext* context, const TupleRow* row) { +SmallIntVal SlotRef::GetSmallIntVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_SMALLINT); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return SmallIntVal::null(); return SmallIntVal(*reinterpret_cast<int16_t*>(t->GetSlot(slot_offset_))); } -IntVal SlotRef::GetIntVal(ExprContext* context, const TupleRow* row) { +IntVal SlotRef::GetIntVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_INT); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return IntVal::null(); return IntVal(*reinterpret_cast<int32_t*>(t->GetSlot(slot_offset_))); } -BigIntVal SlotRef::GetBigIntVal(ExprContext* context, const TupleRow* row) { +BigIntVal SlotRef::GetBigIntVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_BIGINT); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return BigIntVal::null(); return BigIntVal(*reinterpret_cast<int64_t*>(t->GetSlot(slot_offset_))); } -FloatVal SlotRef::GetFloatVal(ExprContext* context, const TupleRow* row) { +FloatVal SlotRef::GetFloatVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_FLOAT); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return FloatVal::null(); return FloatVal(*reinterpret_cast<float*>(t->GetSlot(slot_offset_))); } -DoubleVal SlotRef::GetDoubleVal(ExprContext* context, const TupleRow* row) { +DoubleVal SlotRef::GetDoubleVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_DOUBLE); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return DoubleVal::null(); return DoubleVal(*reinterpret_cast<double*>(t->GetSlot(slot_offset_))); } -StringVal SlotRef::GetStringVal(ExprContext* context, const TupleRow* row) { +StringVal SlotRef::GetStringVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK(type_.IsStringType()); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return StringVal::null(); @@ -428,7 +438,8 @@ StringVal SlotRef::GetStringVal(ExprContext* context, const TupleRow* row) { return result; } -TimestampVal SlotRef::GetTimestampVal(ExprContext* context, const TupleRow* row) { +TimestampVal SlotRef::GetTimestampVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_TIMESTAMP); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return TimestampVal::null(); @@ -438,7 +449,8 @@ TimestampVal SlotRef::GetTimestampVal(ExprContext* context, const TupleRow* row) return result; } -DecimalVal SlotRef::GetDecimalVal(ExprContext* context, const TupleRow* row) { +DecimalVal SlotRef::GetDecimalVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK_EQ(type_.type, TYPE_DECIMAL); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return DecimalVal::null(); @@ -455,7 +467,8 @@ DecimalVal SlotRef::GetDecimalVal(ExprContext* context, const TupleRow* row) { } } -CollectionVal SlotRef::GetCollectionVal(ExprContext* context, const TupleRow* row) { +CollectionVal SlotRef::GetCollectionVal( + ScalarExprEvaluator* eval, const TupleRow* row) const { DCHECK(type_.IsCollectionType()); Tuple* t = row->GetTuple(tuple_idx_); if (t == NULL || t->IsNull(null_indicator_offset_)) return CollectionVal::null(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/slot-ref.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/slot-ref.h b/be/src/exprs/slot-ref.h index e86617d..82d25ef 100644 --- a/be/src/exprs/slot-ref.h +++ b/be/src/exprs/slot-ref.h @@ -18,13 +18,24 @@ #ifndef IMPALA_EXPRS_SLOTREF_H #define IMPALA_EXPRS_SLOTREF_H -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "runtime/descriptors.h" namespace impala { +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::TimestampVal; +using impala_udf::StringVal; +using impala_udf::DecimalVal; + /// Reference to a single slot of a tuple. -class SlotRef : public Expr { +class SlotRef : public ScalarExpr { public: SlotRef(const TExprNode& node); SlotRef(const SlotDescriptor* desc); @@ -36,27 +47,36 @@ class SlotRef : public Expr { /// Used for testing. GetValue will return tuple + offset interpreted as 'type' SlotRef(const ColumnType& type, int offset, const bool nullable = false); - virtual Status Prepare( - RuntimeState* state, const RowDescriptor& row_desc, ExprContext* context); - virtual std::string DebugString() const; - virtual int GetSlotIds(std::vector<SlotId>* slot_ids) const; + /// Exposed as public so AGG node can initialize its build expressions. + virtual Status Init(const RowDescriptor& row_desc, RuntimeState* state) + override WARN_UNUSED_RESULT; + virtual std::string DebugString() const override; + virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn) + override WARN_UNUSED_RESULT; + virtual bool IsSlotRef() const override { return true; } + virtual int GetSlotIds(std::vector<SlotId>* slot_ids) const override; const SlotId& slot_id() const { return slot_id_; } - virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn); + protected: + friend class ScalarExpr; + friend class ScalarExprEvaluator; - virtual impala_udf::BooleanVal GetBooleanVal(ExprContext* context, const TupleRow*); - virtual impala_udf::TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow*); - virtual impala_udf::SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow*); - virtual impala_udf::IntVal GetIntVal(ExprContext* context, const TupleRow*); - virtual impala_udf::BigIntVal GetBigIntVal(ExprContext* context, const TupleRow*); - virtual impala_udf::FloatVal GetFloatVal(ExprContext* context, const TupleRow*); - virtual impala_udf::DoubleVal GetDoubleVal(ExprContext* context, const TupleRow*); - virtual impala_udf::StringVal GetStringVal(ExprContext* context, const TupleRow*); - virtual impala_udf::TimestampVal GetTimestampVal(ExprContext* context, const TupleRow*); - virtual impala_udf::DecimalVal GetDecimalVal(ExprContext* context, const TupleRow*); - virtual impala_udf::CollectionVal GetCollectionVal(ExprContext* context, const TupleRow*); + virtual BooleanVal GetBooleanVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual TinyIntVal GetTinyIntVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual SmallIntVal GetSmallIntVal( + ScalarExprEvaluator*, const TupleRow*) const override; + virtual IntVal GetIntVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual BigIntVal GetBigIntVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual FloatVal GetFloatVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual DoubleVal GetDoubleVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual StringVal GetStringVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual TimestampVal GetTimestampVal( + ScalarExprEvaluator*, const TupleRow*) const override; + virtual DecimalVal GetDecimalVal(ScalarExprEvaluator*, const TupleRow*) const override; + virtual CollectionVal GetCollectionVal( + ScalarExprEvaluator*, const TupleRow*) const override; - protected: + private: int tuple_idx_; // within row int slot_offset_; // within tuple NullIndicatorOffset null_indicator_offset_; // within tuple http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/string-functions-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/string-functions-ir.cc b/be/src/exprs/string-functions-ir.cc index 0a05739..09e19fb 100644 --- a/be/src/exprs/string-functions-ir.cc +++ b/be/src/exprs/string-functions-ir.cc @@ -26,7 +26,7 @@ #include <boost/static_assert.hpp> #include "exprs/anyval-util.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "runtime/string-value.inline.h" #include "runtime/tuple-row.h" #include "util/bit-util.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/string-functions.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/string-functions.h b/be/src/exprs/string-functions.h index 3dad5f6..86fc547 100644 --- a/be/src/exprs/string-functions.h +++ b/be/src/exprs/string-functions.h @@ -28,6 +28,19 @@ using namespace impala_udf; namespace impala { +using impala_udf::FunctionContext; +using impala_udf::AnyVal; +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::TimestampVal; +using impala_udf::StringVal; +using impala_udf::DecimalVal; + class Expr; class OpcodeRegistry; class TupleRow; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/timestamp-functions.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/timestamp-functions.h b/be/src/exprs/timestamp-functions.h index b02e7bd..cbd8b98 100644 --- a/be/src/exprs/timestamp-functions.h +++ b/be/src/exprs/timestamp-functions.h @@ -22,10 +22,21 @@ #include "common/status.h" #include "udf/udf.h" -using namespace impala_udf; - namespace impala { +using impala_udf::FunctionContext; +using impala_udf::AnyVal; +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::TimestampVal; +using impala_udf::StringVal; +using impala_udf::DecimalVal; + class Expr; class OpcodeRegistry; struct StringValue; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/tuple-is-null-predicate.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/tuple-is-null-predicate.cc b/be/src/exprs/tuple-is-null-predicate.cc index dd20e8a..2762247 100644 --- a/be/src/exprs/tuple-is-null-predicate.cc +++ b/be/src/exprs/tuple-is-null-predicate.cc @@ -27,7 +27,8 @@ namespace impala { -BooleanVal TupleIsNullPredicate::GetBooleanVal(ExprContext* ctx, const TupleRow* row) { +BooleanVal TupleIsNullPredicate::GetBooleanVal( + ScalarExprEvaluator* evaluator, const TupleRow* row) const { int count = 0; for (int i = 0; i < tuple_idxs_.size(); ++i) { count += row->GetTuple(tuple_idxs_[i]) == NULL; @@ -41,9 +42,8 @@ TupleIsNullPredicate::TupleIsNullPredicate(const TExprNode& node) node.tuple_is_null_pred.tuple_ids.end()) { } -Status TupleIsNullPredicate::Prepare(RuntimeState* state, const RowDescriptor& row_desc, - ExprContext* ctx) { - RETURN_IF_ERROR(Expr::Prepare(state, row_desc, ctx)); +Status TupleIsNullPredicate::Init(const RowDescriptor& row_desc, RuntimeState* state) { + RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state)); DCHECK_EQ(0, children_.size()); // Resolve tuple ids to tuple indexes. for (int i = 0; i < tuple_ids_.size(); ++i) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/tuple-is-null-predicate.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/tuple-is-null-predicate.h b/be/src/exprs/tuple-is-null-predicate.h index e687010..611f7e6 100644 --- a/be/src/exprs/tuple-is-null-predicate.h +++ b/be/src/exprs/tuple-is-null-predicate.h @@ -32,16 +32,16 @@ class TExprNode; /// TODO: Implement codegen to eliminate overhead on non-nullable tuples. class TupleIsNullPredicate: public Predicate { protected: - friend class Expr; + friend class ScalarExpr; TupleIsNullPredicate(const TExprNode& node); - virtual Status Prepare(RuntimeState* state, const RowDescriptor& row_desc, - ExprContext* ctx); - virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn); - virtual std::string DebugString() const; + virtual Status Init(const RowDescriptor& row_desc, RuntimeState* state) override; + virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn) + override WARN_UNUSED_RESULT; + virtual std::string DebugString() const override; - virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow* row); + virtual BooleanVal GetBooleanVal(ScalarExprEvaluator*, const TupleRow*) const override; private: /// Tuple ids to check for NULL. May contain ids of nullable and non-nullable tuples. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/udf-builtins.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/udf-builtins.h b/be/src/exprs/udf-builtins.h index 3bbc5d2..2ef439b 100644 --- a/be/src/exprs/udf-builtins.h +++ b/be/src/exprs/udf-builtins.h @@ -20,10 +20,20 @@ #include "udf/udf.h" -using namespace impala_udf; - namespace impala { +using impala_udf::FunctionContext; +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::TimestampVal; +using impala_udf::StringVal; +using impala_udf::DecimalVal; + /// Builtins written against the UDF interface. The builtins in the other files /// should be replaced to the UDF interface as well. /// This is just to illustrate how builtins against the UDF interface will be http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/utility-functions.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/utility-functions.h b/be/src/exprs/utility-functions.h index a8dbc7b..41409c4 100644 --- a/be/src/exprs/utility-functions.h +++ b/be/src/exprs/utility-functions.h @@ -21,10 +21,21 @@ #include "udf/udf.h" -using namespace impala_udf; - namespace impala { +using impala_udf::FunctionContext; +using impala_udf::AnyVal; +using impala_udf::BooleanVal; +using impala_udf::TinyIntVal; +using impala_udf::SmallIntVal; +using impala_udf::IntVal; +using impala_udf::BigIntVal; +using impala_udf::FloatVal; +using impala_udf::DoubleVal; +using impala_udf::TimestampVal; +using impala_udf::StringVal; +using impala_udf::DecimalVal; + class Expr; class OpcodeRegistry; class TupleRow; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index cd3a741..4e2aca8 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -18,6 +18,8 @@ #include "runtime/coordinator-backend-state.h" #include <sstream> +#include <string> +#include <boost/lexical_cast.hpp> #include <boost/thread/locks.hpp> #include <boost/thread/lock_guard.hpp> #include <boost/accumulators/accumulators.hpp> @@ -41,6 +43,7 @@ #include "gen-cpp/Types_types.h" #include "gen-cpp/ImpalaInternalService_types.h" #include "gen-cpp/ImpalaInternalService_constants.h" + #include "common/names.h" using namespace impala; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 3c5ffbe..2bfe1b5 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -587,7 +587,8 @@ Status Coordinator::FinalizeSuccessfulInsert() { // TODO: add DescriptorTbl::CreateTableDescriptor() so we can create a // descriptor for just the output table, calling Create() can be very // expensive. - DescriptorTbl::Create(obj_pool(), query_ctx_.desc_tbl, nullptr, &descriptor_table); + RETURN_IF_ERROR( + DescriptorTbl::Create(obj_pool(), query_ctx_.desc_tbl, &descriptor_table)); HdfsTableDescriptor* hdfs_table = static_cast<HdfsTableDescriptor*>( descriptor_table->GetTableDescriptor(finalize_params_.table_id)); DCHECK(hdfs_table != nullptr) @@ -697,6 +698,11 @@ Status Coordinator::FinalizeSuccessfulInsert() { } } + // Release resources on the descriptor table. + descriptor_table->ReleaseResources(); + descriptor_table = nullptr; + hdfs_table = nullptr; + { SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer", "FinalizationTimer")); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 0d772eb..e3106b0 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -53,8 +53,6 @@ class RowBatch; class RowDescriptor; class ObjectPool; class RuntimeState; -class Expr; -class ExprContext; class ExecEnv; class TUpdateCatalogRequest; class TQueryExecRequest; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/data-stream-sender.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc index dc98c49..a45c2c4 100644 --- a/be/src/runtime/data-stream-sender.cc +++ b/be/src/runtime/data-stream-sender.cc @@ -21,8 +21,8 @@ #include <thrift/protocol/TDebugProtocol.h> #include "common/logging.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "gutil/strings/substitute.h" #include "runtime/descriptors.h" #include "runtime/tuple-row.h" @@ -325,7 +325,7 @@ void DataStreamSender::Channel::Teardown(RuntimeState* state) { batch_.reset(); } -DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, +DataStreamSender::DataStreamSender(int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size) @@ -351,8 +351,8 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, for (int i = 0; i < destinations.size(); ++i) { channels_.push_back( new Channel(this, row_desc, destinations[i].server, - destinations[i].fragment_instance_id, - sink.dest_node_id, per_channel_buffer_size)); + destinations[i].fragment_instance_id, sink.dest_node_id, + per_channel_buffer_size)); } if (partition_type_ == TPartitionType::UNPARTITIONED @@ -361,14 +361,6 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, srand(reinterpret_cast<uint64_t>(this)); random_shuffle(channels_.begin(), channels_.end()); } - - if (partition_type_ == TPartitionType::HASH_PARTITIONED - || partition_type_ == TPartitionType::KUDU) { - // TODO: move this to Init()? would need to save 'sink' somewhere - Status status = Expr::CreateExprTrees( - pool, sink.output_partition.partition_exprs, &partition_expr_ctxs_); - DCHECK(status.ok()); - } } string DataStreamSender::GetName() { @@ -383,14 +375,23 @@ DataStreamSender::~DataStreamSender() { } } +Status DataStreamSender::Init(const vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state) { + DCHECK(tsink.__isset.stream_sink); + if (partition_type_ == TPartitionType::HASH_PARTITIONED || + partition_type_ == TPartitionType::KUDU) { + RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs, + row_desc_, state, &partition_exprs_)); + } + return Status::OK(); +} + Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); state_ = state; SCOPED_TIMER(profile_->total_time_counter()); - - RETURN_IF_ERROR( - Expr::Prepare(partition_expr_ctxs_, state, row_desc_, mem_tracker_.get())); - + RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state, + state->obj_pool(), expr_mem_pool(), &partition_expr_evals_)); bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); uncompressed_bytes_counter_ = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); @@ -414,7 +415,7 @@ Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* parent_mem_tra } Status DataStreamSender::Open(RuntimeState* state) { - return Expr::Open(partition_expr_ctxs_, state); + return ScalarExprEvaluator::Open(partition_expr_evals_, state); } Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) { @@ -422,8 +423,7 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) { DCHECK(!flushed_); if (batch->num_rows() == 0) return Status::OK(); - if (partition_type_ == TPartitionType::UNPARTITIONED - || channels_.size() == 1) { + if (partition_type_ == TPartitionType::UNPARTITIONED || channels_.size() == 1) { // current_thrift_batch_ is *not* the one that was written by the last call // to Serialize() RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, channels_.size())); @@ -443,12 +443,12 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) { RETURN_IF_ERROR(current_channel->SendBatch(current_channel->thrift_batch())); current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size(); } else if (partition_type_ == TPartitionType::KUDU) { - DCHECK_EQ(partition_expr_ctxs_.size(), 1); + DCHECK_EQ(partition_expr_evals_.size(), 1); int num_channels = channels_.size(); for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* row = batch->GetRow(i); int32_t partition = - *reinterpret_cast<int32_t*>(partition_expr_ctxs_[0]->GetValue(row)); + *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row)); if (partition < 0) { // This row doesn't coorespond to a partition, e.g. it's outside the given ranges. partition = next_unknown_partition_; @@ -465,17 +465,18 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) { for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* row = batch->GetRow(i); uint32_t hash_val = HashUtil::FNV_SEED; - for (int i = 0; i < partition_expr_ctxs_.size(); ++i) { - ExprContext* ctx = partition_expr_ctxs_[i]; - void* partition_val = ctx->GetValue(row); + for (int i = 0; i < partition_exprs_.size(); ++i) { + ScalarExprEvaluator* eval = partition_expr_evals_[i]; + void* partition_val = eval->GetValue(row); // We can't use the crc hash function here because it does not result // in uncorrelated hashes with different seeds. Instead we must use // fnv hash. // TODO: fix crc hash/GetHashValue() - hash_val = - RawValue::GetHashValueFnv(partition_val, ctx->root()->type(), hash_val); + DCHECK(&partition_expr_evals_[i]->root() == partition_exprs_[i]); + hash_val = RawValue::GetHashValueFnv( + partition_val, partition_exprs_[i]->type(), hash_val); } - ExprContext::FreeLocalAllocations(partition_expr_ctxs_); + ScalarExprEvaluator::FreeLocalAllocations(partition_expr_evals_); RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row)); } } @@ -502,7 +503,8 @@ void DataStreamSender::Close(RuntimeState* state) { for (int i = 0; i < channels_.size(); ++i) { channels_[i]->Teardown(state); } - Expr::Close(partition_expr_ctxs_, state); + ScalarExprEvaluator::Close(partition_expr_evals_, state); + ScalarExpr::Close(partition_exprs_); DataSink::Close(state); closed_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/data-stream-sender.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.h b/be/src/runtime/data-stream-sender.h index d157577..0719daf 100644 --- a/be/src/runtime/data-stream-sender.h +++ b/be/src/runtime/data-stream-sender.h @@ -31,10 +31,9 @@ namespace impala { -class Expr; -class ExprContext; class RowBatch; class RowDescriptor; + class MemTracker; class TDataStreamSink; class TNetworkAddress; @@ -47,6 +46,7 @@ class TPlanFragmentDestination; // /// TODO: capture stats that describe distribution of rows/data volume /// across channels. +/// TODO: create a PlanNode equivalent class for DataSink. class DataStreamSender : public DataSink { public: /// Construct a sender according to the output specification (sink), @@ -57,10 +57,11 @@ class DataStreamSender : public DataSink { /// The RowDescriptor must live until Close() is called. /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED, /// and RANDOM. - DataStreamSender(ObjectPool* pool, int sender_id, - const RowDescriptor& row_desc, const TDataStreamSink& sink, - const std::vector<TPlanFragmentDestination>& destinations, - int per_channel_buffer_size); + DataStreamSender(int sender_id, const RowDescriptor& row_desc, + const TDataStreamSink& tsink, + const std::vector<TPlanFragmentDestination>& destinations, + int per_channel_buffer_size); + virtual ~DataStreamSender(); virtual std::string GetName(); @@ -98,6 +99,12 @@ class DataStreamSender : public DataSink { /// broadcast to multiple receivers, they are counted once per receiver. int64_t GetNumDataBytesSent() const; + protected: + friend class DataStreamTest; + + virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state); + private: class Channel; @@ -120,9 +127,13 @@ class DataStreamSender : public DataSink { TRowBatch thrift_batch2_; TRowBatch* current_thrift_batch_; // the next one to fill in Send() - std::vector<ExprContext*> partition_expr_ctxs_; // compute per-row partition values std::vector<Channel*> channels_; + /// Expressions of partition keys. It's used to compute the + /// per-row partition values for shuffling exchange; + std::vector<ScalarExpr*> partition_exprs_; + std::vector<ScalarExprEvaluator*> partition_expr_evals_; + RuntimeProfile::Counter* serialize_batch_timer_; /// The concurrent wall time spent sending data over the network. RuntimeProfile::ConcurrentTimerCounter* thrift_transmit_timer_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index 4cb2bf0..1048fb6 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -113,6 +113,7 @@ class DataStreamTest : public testing::Test { // Initialize MemTrackers and RuntimeState for use by the data stream receiver. exec_env_.InitForFeTests(); runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_)); + mem_pool_.reset(new MemPool(&tracker_)); // Stop tests that rely on mismatched sender / receiver pairs timing out from failing. FLAGS_datastream_sender_timeout_ms = 250; @@ -121,6 +122,9 @@ class DataStreamTest : public testing::Test { virtual void SetUp() { CreateRowDesc(); + + is_asc_.push_back(true); + nulls_first_.push_back(true); CreateTupleComparator(); next_instance_id_.lo = 0; @@ -156,20 +160,29 @@ class DataStreamTest : public testing::Test { StartBackend(); } - const TDataStreamSink& GetSink(TPartitionType::type partition_type) { + const TDataSink GetSink(TPartitionType::type partition_type) { + TDataSink tdata_sink; switch (partition_type) { - case TPartitionType::UNPARTITIONED: return broadcast_sink_; - case TPartitionType::RANDOM: return random_sink_; - case TPartitionType::HASH_PARTITIONED: return hash_sink_; - default: EXPECT_TRUE(false) << "Unhandled sink type: " << partition_type; + case TPartitionType::UNPARTITIONED: + tdata_sink.__set_stream_sink(broadcast_sink_); + break; + case TPartitionType::RANDOM: + tdata_sink.__set_stream_sink(random_sink_); + break; + case TPartitionType::HASH_PARTITIONED: + tdata_sink.__set_stream_sink(hash_sink_); + break; + default: + EXPECT_TRUE(false) << "Unhandled sink type: " << partition_type; } - // Should never reach this. - return broadcast_sink_; + return tdata_sink; } virtual void TearDown() { - lhs_slot_ctx_->Close(NULL); - rhs_slot_ctx_->Close(NULL); + desc_tbl_->ReleaseResources(); + less_than_->Close(runtime_state_.get()); + ScalarExpr::Close(ordering_exprs_); + mem_pool_->FreeAll(); exec_env_.impalad_client_cache()->TestShutdown(); StopBackend(); } @@ -182,13 +195,18 @@ class DataStreamTest : public testing::Test { ObjectPool obj_pool_; MemTracker tracker_; + scoped_ptr<MemPool> mem_pool_; DescriptorTbl* desc_tbl_; const RowDescriptor* row_desc_; + vector<bool> is_asc_; + vector<bool> nulls_first_; TupleRowComparator* less_than_; ExecEnv exec_env_; scoped_ptr<RuntimeState> runtime_state_; TUniqueId next_instance_id_; string stmt_; + // The sorting expression for the single BIGINT column. + vector<ScalarExpr*> ordering_exprs_; // RowBatch generation scoped_ptr<RowBatch> batch_; @@ -210,7 +228,7 @@ class DataStreamTest : public testing::Test { Status status; int num_bytes_sent; - SenderInfo(): thread_handle(NULL), num_bytes_sent(0) {} + SenderInfo(): thread_handle(nullptr), num_bytes_sent(0) {} }; vector<SenderInfo> sender_info_; @@ -229,7 +247,7 @@ class DataStreamTest : public testing::Test { : stream_type(stream_type), num_senders(num_senders), receiver_num(receiver_num), - thread_handle(NULL), + thread_handle(nullptr), num_rows_received(0) {} ~ReceiverInfo() { @@ -271,7 +289,7 @@ class DataStreamTest : public testing::Test { slot_desc.__set_nullIndicatorBit(-1); slot_desc.__set_slotIdx(0); thrift_desc_tbl.slotDescriptors.push_back(slot_desc); - EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, nullptr, &desc_tbl_)); + EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, &desc_tbl_)); vector<TTupleId> row_tids; row_tids.push_back(0); @@ -283,19 +301,11 @@ class DataStreamTest : public testing::Test { // Create a tuple comparator to sort in ascending order on the single bigint column. void CreateTupleComparator() { SlotRef* lhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0)); - lhs_slot_ctx_ = obj_pool_.Add(new ExprContext(lhs_slot)); - SlotRef* rhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0)); - rhs_slot_ctx_ = obj_pool_.Add(new ExprContext(rhs_slot)); - - lhs_slot_ctx_->Prepare(NULL, *row_desc_, &tracker_); - rhs_slot_ctx_->Prepare(NULL, *row_desc_, &tracker_); - lhs_slot_ctx_->Open(NULL); - rhs_slot_ctx_->Open(NULL); - SortExecExprs* sort_exprs = obj_pool_.Add(new SortExecExprs()); - sort_exprs->Init( - vector<ExprContext*>(1, lhs_slot_ctx_), vector<ExprContext*>(1, rhs_slot_ctx_)); - less_than_ = obj_pool_.Add(new TupleRowComparator( - *sort_exprs, vector<bool>(1, true), vector<bool>(1, false))); + ASSERT_OK(lhs_slot->Init(RowDescriptor(), runtime_state_.get())); + ordering_exprs_.push_back(lhs_slot); + less_than_ = obj_pool_.Add(new TupleRowComparator(ordering_exprs_, + is_asc_, nulls_first_)); + less_than_->Open(&obj_pool_, runtime_state_.get(), mem_pool_.get()); } // Create batch_, but don't fill it with data yet. Assumes we created row_desc_. @@ -323,7 +333,7 @@ class DataStreamTest : public testing::Test { // Start receiver (expecting given number of senders) in separate thread. void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num, - int buffer_size, bool is_merging, TUniqueId* out_id = NULL) { + int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) { VLOG_QUERY << "start receiver"; RuntimeProfile* profile = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "TestReceiver")); @@ -339,7 +349,7 @@ class DataStreamTest : public testing::Test { info.thread_handle = new thread(&DataStreamTest::ReadStreamMerging, this, &info, profile); } - if (out_id != NULL) *out_id = instance_id; + if (out_id != nullptr) *out_id = instance_id; } void JoinReceivers() { @@ -355,7 +365,7 @@ class DataStreamTest : public testing::Test { RowBatch* batch; VLOG_QUERY << "start reading"; while (!(info->status = info->stream_recvr->GetBatch(&batch)).IsCancelled() && - (batch != NULL)) { + (batch != nullptr)) { VLOG_QUERY << "read batch #rows=" << batch->num_rows(); for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* row = batch->GetRow(i); @@ -444,7 +454,7 @@ class DataStreamTest : public testing::Test { void StartBackend() { boost::shared_ptr<ImpalaTestBackend> handler(new ImpalaTestBackend(stream_mgr_)); boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler)); - server_ = new ThriftServer("DataStreamTest backend", processor, FLAGS_port, NULL); + server_ = new ThriftServer("DataStreamTest backend", processor, FLAGS_port, nullptr); server_->Start(); } @@ -477,9 +487,16 @@ class DataStreamTest : public testing::Test { int sender_num, int channel_buffer_size, TPartitionType::type partition_type) { RuntimeState state(TQueryCtx(), &exec_env_, desc_tbl_); VLOG_QUERY << "create sender " << sender_num; - const TDataStreamSink& sink = GetSink(partition_type); + const TDataSink& sink = GetSink(partition_type); DataStreamSender sender( - &obj_pool_, sender_num, *row_desc_, sink, dest_, channel_buffer_size); + sender_num, *row_desc_, sink.stream_sink, dest_, channel_buffer_size); + + TExprNode expr_node; + expr_node.node_type = TExprNodeType::SLOT_REF; + TExpr output_exprs; + output_exprs.nodes.push_back(expr_node); + EXPECT_OK(sender.Init(vector<TExpr>({output_exprs}), sink, &state)); + EXPECT_OK(sender.Prepare(&state, &tracker_)); EXPECT_OK(sender.Open(&state)); scoped_ptr<RowBatch> batch(CreateRowBatch()); @@ -517,10 +534,6 @@ class DataStreamTest : public testing::Test { JoinReceivers(); CheckReceivers(stream_type, num_senders); } - - private: - ExprContext* lhs_slot_ctx_; - ExprContext* rhs_slot_ctx_; }; TEST_F(DataStreamTest, UnknownSenderSmallResult) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/descriptors.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index bb7c1d0..ad66c56 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -27,9 +27,11 @@ #include "codegen/llvm-codegen.h" #include "common/object-pool.h" +#include "common/status.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "gen-cpp/Descriptors_types.h" #include "gen-cpp/PlanNodes_types.h" -#include "exprs/expr.h" #include "runtime/runtime-state.h" #include "common/names.h" @@ -173,25 +175,17 @@ string TableDescriptor::DebugString() const { return out.str(); } -HdfsPartitionDescriptor::HdfsPartitionDescriptor(const THdfsTable& thrift_table, - const THdfsPartition& thrift_partition, ObjectPool* pool) +HdfsPartitionDescriptor::HdfsPartitionDescriptor( + const THdfsTable& thrift_table, const THdfsPartition& thrift_partition) : line_delim_(thrift_partition.lineDelim), field_delim_(thrift_partition.fieldDelim), collection_delim_(thrift_partition.collectionDelim), escape_char_(thrift_partition.escapeChar), block_size_(thrift_partition.blockSize), id_(thrift_partition.id), - file_format_(thrift_partition.fileFormat), - object_pool_(pool) { + thrift_partition_key_exprs_(thrift_partition.partitionKeyExprs), + file_format_(thrift_partition.fileFormat) { DecompressLocation(thrift_table, thrift_partition, &location_); - for (int i = 0; i < thrift_partition.partitionKeyExprs.size(); ++i) { - ExprContext* ctx; - // TODO: Move to dedicated Init method and treat Status return correctly - Status status = Expr::CreateExprTree(object_pool_, - thrift_partition.partitionKeyExprs[i], &ctx); - DCHECK(status.ok()); - partition_key_value_ctxs_.push_back(ctx); - } } string HdfsPartitionDescriptor::DebugString() const { @@ -210,17 +204,14 @@ string DataSourceTableDescriptor::DebugString() const { return out.str(); } -HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, - ObjectPool* pool) +HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool) : TableDescriptor(tdesc), hdfs_base_dir_(tdesc.hdfsTable.hdfsBaseDir), null_partition_key_value_(tdesc.hdfsTable.nullPartitionKeyValue), - null_column_value_(tdesc.hdfsTable.nullColumnValue), - object_pool_(pool) { + null_column_value_(tdesc.hdfsTable.nullColumnValue) { for (const auto& entry : tdesc.hdfsTable.partitions) { HdfsPartitionDescriptor* partition = - new HdfsPartitionDescriptor(tdesc.hdfsTable, entry.second, pool); - object_pool_->Add(partition); + pool->Add(new HdfsPartitionDescriptor(tdesc.hdfsTable, entry.second)); partition_descriptors_[entry.first] = partition; } avro_schema_ = tdesc.hdfsTable.__isset.avroSchema ? tdesc.hdfsTable.avroSchema : ""; @@ -479,8 +470,31 @@ string RowDescriptor::DebugString() const { return ss.str(); } +Status DescriptorTbl::CreatePartKeyExprs( + const HdfsTableDescriptor& hdfs_tbl, ObjectPool* pool) { + // Prepare and open partition exprs + for (const auto& part_entry : hdfs_tbl.partition_descriptors()) { + HdfsPartitionDescriptor* part_desc = part_entry.second; + vector<ScalarExpr*> partition_key_value_exprs; + RETURN_IF_ERROR(ScalarExpr::Create(part_desc->thrift_partition_key_exprs_, + RowDescriptor(), nullptr, pool, &partition_key_value_exprs)); + for (const ScalarExpr* partition_expr : partition_key_value_exprs) { + DCHECK(partition_expr->IsLiteral()); + DCHECK(!partition_expr->HasFnCtx()); + DCHECK_EQ(partition_expr->GetNumChildren(), 0); + } + // TODO: RowDescriptor should arguably be optional in Prepare for known literals. + // Partition exprs are not used in the codegen case. Don't codegen them. + RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_value_exprs, nullptr, + pool, nullptr, &part_desc->partition_key_value_evals_)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open( + part_desc->partition_key_value_evals_, nullptr)); + } + return Status::OK(); +} + Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, - MemTracker* mem_tracker, DescriptorTbl** tbl) { + DescriptorTbl** tbl) { *tbl = pool->Add(new DescriptorTbl()); // deserialize table descriptors first, they are being referenced by tuple descriptors for (size_t i = 0; i < thrift_tbl.tableDescriptors.size(); ++i) { @@ -489,21 +503,8 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb switch (tdesc.tableType) { case TTableType::HDFS_TABLE: desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); - - if (mem_tracker != nullptr) { - // prepare and open partition exprs - const HdfsTableDescriptor* hdfs_tbl = - static_cast<const HdfsTableDescriptor*>(desc); - for (const auto& part_entry : hdfs_tbl->partition_descriptors()) { - // TODO: RowDescriptor should arguably be optional in Prepare for known - // literals Partition exprs are not used in the codegen case. Don't codegen - // them. - RETURN_IF_ERROR(Expr::Prepare(part_entry.second->partition_key_value_ctxs(), - nullptr, RowDescriptor(), mem_tracker)); - RETURN_IF_ERROR(Expr::Open( - part_entry.second->partition_key_value_ctxs(), nullptr)); - } - } + RETURN_IF_ERROR(CreatePartKeyExprs( + *static_cast<const HdfsTableDescriptor*>(desc), pool)); break; case TTableType::HBASE_TABLE: desc = pool->Add(new HBaseTableDescriptor(tdesc)); @@ -552,7 +553,11 @@ void DescriptorTbl::ReleaseResources() { const HdfsTableDescriptor* hdfs_tbl = static_cast<const HdfsTableDescriptor*>(entry.second); for (const auto& part_entry: hdfs_tbl->partition_descriptors()) { - Expr::Close(part_entry.second->partition_key_value_ctxs(), nullptr); + for (ScalarExprEvaluator* eval : + part_entry.second->partition_key_value_evals()) { + eval->Close(nullptr); + const_cast<ScalarExpr&>(eval->root()).Close(); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/descriptors.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 06c65c5..8a8dd7b 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -24,8 +24,8 @@ #include <boost/scoped_ptr.hpp> #include <ostream> -#include "common/status.h" #include "common/global-types.h" +#include "common/status.h" #include "runtime/types.h" #include "gen-cpp/Descriptors_types.h" // for TTupleId @@ -40,13 +40,12 @@ namespace llvm { namespace impala { -class Expr; -class ExprContext; class LlvmBuilder; class LlvmCodeGen; class ObjectPool; class RuntimeState; -class MemTracker; +class ScalarExpr; +class ScalarExprEvaluator; class TDescriptorTable; class TSlotDescriptor; class TTable; @@ -191,7 +190,7 @@ class SlotDescriptor { SlotDescriptor(const TSlotDescriptor& tdesc, const TupleDescriptor* parent, const TupleDescriptor* collection_item_descriptor); - /// Generate LLVM code at the insert position of 'builder' to get the i8 value of the + /// Generate LLVM code at the insert position of 'builder' to get the i8 value of /// the byte containing 'null_indicator_offset' in 'tuple'. If 'null_byte_ptr' is /// non-NULL, sets that to a pointer to the null byte. static llvm::Value* CodegenGetNullByte(LlvmCodeGen* codegen, LlvmBuilder* builder, @@ -249,7 +248,7 @@ class TableDescriptor { class HdfsPartitionDescriptor { public: HdfsPartitionDescriptor(const THdfsTable& thrift_table, - const THdfsPartition& thrift_partition, ObjectPool* pool); + const THdfsPartition& thrift_partition); char line_delim() const { return line_delim_; } char field_delim() const { return field_delim_; } @@ -261,14 +260,16 @@ class HdfsPartitionDescriptor { int64_t id() const { return id_; } std::string DebugString() const; - /// It is safe to evaluate the returned expr contexts concurrently from multiple + /// It is safe to call the returned expr evaluators concurrently from multiple /// threads because all exprs are literals, after the descriptor table has been /// opened. - const std::vector<ExprContext*>& partition_key_value_ctxs() const { - return partition_key_value_ctxs_; + const std::vector<ScalarExprEvaluator*>& partition_key_value_evals() const { + return partition_key_value_evals_; } private: + friend class DescriptorTbl; + char line_delim_; char field_delim_; char collection_delim_; @@ -284,14 +285,15 @@ class HdfsPartitionDescriptor { /// The Prepare()/Open()/Close() cycle is controlled by the containing descriptor table /// because the same partition descriptor may be used by multiple exec nodes with /// different lifetimes. - std::vector<ExprContext*> partition_key_value_ctxs_; + const std::vector<TExpr>& thrift_partition_key_exprs_; + + /// These evaluators are safe to be shared by all fragment instances as all expressions + /// are Literals. + /// TODO: replace this with vector<Literal> instead. + std::vector<ScalarExprEvaluator*> partition_key_value_evals_; /// The format (e.g. text, sequence file etc.) of data in the files in this partition THdfsFileFormat::type file_format_; - - /// For allocating expression objects in partition_key_values_ - /// Owned by DescriptorTbl, supplied in constructor. - ObjectPool* object_pool_; }; class HdfsTableDescriptor : public TableDescriptor { @@ -327,8 +329,6 @@ class HdfsTableDescriptor : public TableDescriptor { PartitionIdToDescriptorMap partition_descriptors_; /// Set to the table's Avro schema if this is an Avro table, empty string otherwise std::string avro_schema_; - /// Owned by DescriptorTbl - ObjectPool* object_pool_; }; class HBaseTableDescriptor : public TableDescriptor { @@ -461,13 +461,9 @@ class TupleDescriptor { class DescriptorTbl { public: /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it via 'tbl'. - /// If mem_tracker_ != nullptr, also opens partition exprs for hdfs tables (and does - /// memory allocation against that tracker). /// Returns OK on success, otherwise error (in which case 'tbl' will be unset). - /// TODO: when cleaning up ExprCtx, remove the need to pass in a memtracker for literal - /// exprs that don't require additional memory at runtime. static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, - MemTracker* mem_tracker, DescriptorTbl** tbl); + DescriptorTbl** tbl) WARN_UNUSED_RESULT; /// Free memory allocated in Create(). void ReleaseResources(); @@ -491,6 +487,9 @@ class DescriptorTbl { SlotDescriptorMap slot_desc_map_; DescriptorTbl(): tbl_desc_map_(), tuple_desc_map_(), slot_desc_map_() {} + + static Status CreatePartKeyExprs( + const HdfsTableDescriptor& hdfs_tbl, ObjectPool* pool) WARN_UNUSED_RESULT; }; /// Records positions of tuples within row produced by ExecNode. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index 5f109f5..2385eab 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -185,9 +185,8 @@ Status FragmentInstanceState::Prepare() { // prepare sink_ DCHECK(fragment_ctx_.fragment.__isset.output_sink); - RETURN_IF_ERROR( - DataSink::Create( - obj_pool(), fragment_ctx_, instance_ctx_, exec_tree_->row_desc(), &sink_)); + RETURN_IF_ERROR(DataSink::Create(fragment_ctx_, instance_ctx_, exec_tree_->row_desc(), + runtime_state_, &sink_)); RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker())); RuntimeProfile* sink_profile = sink_->profile(); if (sink_profile != nullptr) profile()->AddChild(sink_profile); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 90f0185..0ee2c12 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -266,8 +266,7 @@ void QueryState::StartFInstances() { // set up desc tbl DCHECK(query_ctx().__isset.desc_tbl); - Status status = DescriptorTbl::Create( - &obj_pool_, query_ctx().desc_tbl, query_mem_tracker_, &desc_tbl_); + Status status = DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl, &desc_tbl_); if (!status.ok()) { instances_prepared_promise_.Set(status); ReportExecStatusAux(true, status, nullptr, false); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/runtime-filter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h index b953982..ab70d4a 100644 --- a/be/src/runtime/runtime-filter.h +++ b/be/src/runtime/runtime-filter.h @@ -90,8 +90,8 @@ class RuntimeFilter { /// compact way of representing a full Bloom filter that contains every element. BloomFilter* bloom_filter_; - /// Descriptor of the filter. - TRuntimeFilterDesc filter_desc_; + /// Reference to the filter's thrift descriptor in the thrift Plan tree. + const TRuntimeFilterDesc& filter_desc_; /// Time, in ms, that the filter was registered. int64_t registration_time_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index eb11072..1224345 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -29,7 +29,7 @@ #include "codegen/llvm-codegen.h" #include "common/object-pool.h" #include "common/status.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "exprs/scalar-fn-call.h" #include "runtime/buffered-block-mgr.h" #include "runtime/bufferpool/reservation-tracker.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/sorted-run-merger.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc index 8f8891a..36d7a61 100644 --- a/be/src/runtime/sorted-run-merger.cc +++ b/be/src/runtime/sorted-run-merger.cc @@ -12,7 +12,7 @@ // limitations under the License. #include "runtime/sorted-run-merger.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "runtime/descriptors.h" #include "runtime/row-batch.h" #include "runtime/sorter.h"
