Repository: incubator-impala Updated Branches: refs/heads/master a3ce5b448 -> b38d9826d
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index 0d41ec4..4c33d7f 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -77,7 +77,7 @@ static int NumNonNullBlocks(const vector<BufferedBlockMgr::Block*>& blocks) { /// an optional sequence of var-length blocks containing the var-length data. /// /// Runs are either "initial runs" constructed from the sorter's input by evaluating -/// the expressions in 'sort_tuple_slot_expr_ctxs_' or "intermediate runs" constructed +/// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed /// by merging already-sorted runs. Initial runs are sorted in-place in memory. Once /// sorted, runs can be spilled to disk to free up memory. Sorted runs are merged by /// SortedRunMerger, either to produce the final sorted output or to produce another @@ -113,7 +113,7 @@ class Sorter::Run { /// number of rows actually added in 'num_processed'. If the run is full (no more blocks /// can be allocated), 'num_processed' may be less than the number of remaining rows in /// the batch. AddInputBatch() materializes the input rows using the expressions in - /// sorter_->sort_tuple_slot_expr_ctxs_, while AddIntermediateBatch() just copies rows. + /// sorter_->sort_tuple_expr_evals_, while AddIntermediateBatch() just copies rows. Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed) { DCHECK(initial_run_); if (has_var_len_slots_) { @@ -568,8 +568,9 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int start_index, int* num_ TupleRow* input_row = batch->GetRow(cur_input_index); Tuple* new_tuple = cur_fixed_len_block->Allocate<Tuple>(sort_tuple_size_); if (INITIAL_RUN) { - new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row, *sort_tuple_desc_, - sorter_->sort_tuple_slot_expr_ctxs_, NULL, &string_values, &total_var_len); + new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row, + *sort_tuple_desc_, sorter_->sort_tuple_expr_evals_, NULL, + &string_values, &total_var_len); if (total_var_len > sorter_->block_mgr_->max_block_size()) { return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, Substitute( "Variable length data in a single tuple larger than block size $0 > $1", @@ -1334,7 +1335,7 @@ inline void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple* swap_tup } Sorter::Sorter(const TupleRowComparator& compare_less_than, - const vector<ExprContext*>& slot_materialize_expr_ctxs, + const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc, MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state) : state_(state), @@ -1343,7 +1344,7 @@ Sorter::Sorter(const TupleRowComparator& compare_less_than, block_mgr_(state->block_mgr()), block_mgr_client_(NULL), has_var_len_slots_(false), - sort_tuple_slot_expr_ctxs_(slot_materialize_expr_ctxs), + sort_tuple_exprs_(sort_tuple_exprs), mem_tracker_(mem_tracker), output_row_desc_(output_row_desc), unsorted_run_(NULL), @@ -1362,7 +1363,7 @@ Sorter::~Sorter() { DCHECK(merge_output_run_ == NULL); } -Status Sorter::Prepare() { +Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) { DCHECK(in_mem_tuple_sorter_ == NULL) << "Already prepared"; TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0]; has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots(); @@ -1383,6 +1384,9 @@ Status Sorter::Prepare() { RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this), min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_)); + + RETURN_IF_ERROR(ScalarExprEvaluator::Create(sort_tuple_exprs_, state_, obj_pool, + expr_mem_pool, &sort_tuple_expr_evals_)); return Status::OK(); } @@ -1392,9 +1396,15 @@ Status Sorter::Open() { TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0]; unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true)); RETURN_IF_ERROR(unsorted_run_->Init()); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(sort_tuple_expr_evals_, state_)); return Status::OK(); } +void Sorter::FreeLocalAllocations() { + compare_less_than_.FreeLocalAllocations(); + ScalarExprEvaluator::FreeLocalAllocations(sort_tuple_expr_evals_); +} + Status Sorter::AddBatch(RowBatch* batch) { DCHECK(unsorted_run_ != NULL); DCHECK(batch != NULL); @@ -1459,10 +1469,11 @@ void Sorter::Reset() { obj_pool_.Clear(); } -void Sorter::Close() { +void Sorter::Close(RuntimeState* state) { CleanupAllRuns(); block_mgr_->ClearReservations(block_mgr_client_); obj_pool_.Clear(); + ScalarExprEvaluator::Close(sort_tuple_expr_evals_, state); } void Sorter::CleanupAllRuns() { @@ -1575,7 +1586,6 @@ Status Sorter::CreateMerger(int max_num_runs) { return Status::OK(); } - Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) { RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(), mem_tracker_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/sorter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index d91afa8..da3c6ef 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -37,7 +37,7 @@ class RowBatch; /// The client API for Sorter is as follows: /// AddBatch() is used to add input rows to be sorted. Multiple tuples in an input row are /// materialized into a row with a single tuple (the sort tuple) using the materialization -/// exprs in sort_tuple_slot_expr_ctxs_. The sort tuples are sorted according to the sort +/// exprs in sort_tuple_exprs_. The sort tuples are sorted according to the sort /// parameters and output by the sorter. /// AddBatch() can be called multiple times. // @@ -89,22 +89,24 @@ class RowBatch; /// tuples in place. class Sorter { public: - /// sort_tuple_slot_exprs are the slot exprs used to materialize the tuple to be sorted. - /// compare_less_than is a comparator for the sort tuples (returns true if lhs < rhs). - /// merge_batch_size_ is the size of the batches created to provide rows to the merger - /// and retrieve rows from an intermediate merger. + /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to be + /// sorted. 'compare_less_than' is a comparator for the sort tuples (returns true if + /// lhs < rhs). 'merge_batch_size_' is the size of the batches created to provide rows + /// to the merger and retrieve rows from an intermediate merger. Sorter(const TupleRowComparator& compare_less_than, - const std::vector<ExprContext*>& sort_tuple_slot_expr_ctxs, + const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc, MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state); ~Sorter(); /// Initial set-up of the sorter for execution. Registers with the block mgr. - Status Prepare() WARN_UNUSED_RESULT; + /// The evaluators for 'sort_tuple_exprs_' will be created and stored in 'obj_pool'. + /// All allocation from the evaluators will be from 'expr_mem_pool'. + Status Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) WARN_UNUSED_RESULT; - /// Open the sorter for adding rows. Must be called after Prepare() or Reset() and - /// before calling AddBatch(). + /// Opens the sorter for adding rows and initializes the evaluators for materializing + /// the tuples. Must be called after Prepare() or Reset() and before calling AddBatch(). Status Open() WARN_UNUSED_RESULT; /// Adds a batch of input rows to the current unsorted run. @@ -117,13 +119,16 @@ class Sorter { /// Get the next batch of sorted output rows from the sorter. Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT; + /// Free any local allocations made when materializing and sorting the tuples. + void FreeLocalAllocations(); + /// Resets all internal state like ExecNode::Reset(). /// Init() must have been called, AddBatch()/GetNext()/InputDone() /// may or may not have been called. void Reset(); /// Close the Sorter and free resources. - void Close(); + void Close(RuntimeState* state); private: class Run; @@ -175,9 +180,9 @@ class Sorter { /// True if the tuples to be sorted have var-length slots. bool has_var_len_slots_; - /// Expressions used to materialize the sort tuple. Contains one expr per slot in the - /// tuple. - std::vector<ExprContext*> sort_tuple_slot_expr_ctxs_; + /// Expressions used to materialize the sort tuple. One expr per slot in the tuple. + const std::vector<ScalarExpr*>& sort_tuple_exprs_; + std::vector<ScalarExprEvaluator*> sort_tuple_expr_evals_; /// Mem tracker for batches created during merge. Not owned by Sorter. MemTracker* mem_tracker_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/tuple.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc index 67a238d..86f199d 100644 --- a/be/src/runtime/tuple.cc +++ b/be/src/runtime/tuple.cc @@ -22,8 +22,8 @@ #include "codegen/codegen-anyval.h" #include "codegen/llvm-codegen.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/collection-value.h" #include "runtime/descriptors.h" #include "runtime/mem-pool.h" @@ -203,20 +203,20 @@ void Tuple::ConvertOffsetsToPointers(const TupleDescriptor& desc, uint8_t* tuple } template <bool COLLECT_STRING_VALS, bool NO_POOL> -void Tuple::MaterializeExprs( - TupleRow* row, const TupleDescriptor& desc, ExprContext* const* materialize_expr_ctxs, - MemPool* pool, StringValue** non_null_string_values, int* total_string_lengths, +void Tuple::MaterializeExprs(TupleRow* row, const TupleDescriptor& desc, + ScalarExprEvaluator* const* evals, MemPool* pool, + StringValue** non_null_string_values, int* total_string_lengths, int* num_non_null_string_values) { ClearNullBits(desc); - // Evaluate the materialize_expr_ctxs and place the results in the tuple. + // Evaluate the materialize_expr_evals and place the results in the tuple. for (int i = 0; i < desc.slots().size(); ++i) { SlotDescriptor* slot_desc = desc.slots()[i]; // The FE ensures we don't get any TYPE_NULL expressions by picking an arbitrary type // when necessary, but does not do this for slot descs. // TODO: revisit this logic in the FE DCHECK(slot_desc->type().type == TYPE_NULL || - slot_desc->type() == materialize_expr_ctxs[i]->root()->type()); - void* src = materialize_expr_ctxs[i]->GetValue(row); + slot_desc->type() == evals[i]->root().type()); + void* src = evals[i]->GetValue(row); if (src != NULL) { void* dst = GetSlot(slot_desc->tuple_offset()); RawValue::Write(src, dst, slot_desc->type(), pool); @@ -241,7 +241,7 @@ void Tuple::MaterializeExprs( // ; Function Attrs: alwaysinline // define void @MaterializeExprs(%"class.impala::Tuple"* %opaque_tuple, // %"class.impala::TupleRow"* %row, %"class.impala::TupleDescriptor"* %desc, -// %"class.impala::ExprContext"** %materialize_expr_ctxs, +// %"class.impala::ScalarExprEvaluator"** %materialize_expr_evals, // %"class.impala::MemPool"* %pool, // %"struct.impala::StringValue"** %non_null_string_values, // i32* %total_string_lengths, i32* %num_non_null_string_values) #34 { @@ -291,7 +291,7 @@ void Tuple::MaterializeExprs( // ret void // } Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_vals, - const TupleDescriptor& desc, const vector<ExprContext*>& materialize_expr_ctxs, + const TupleDescriptor& desc, const vector<ScalarExpr*>& slot_materialize_exprs, bool use_mem_pool, Function** fn) { // Only support 'collect_string_vals' == false for now. if (collect_string_vals) { @@ -300,10 +300,10 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_ SCOPED_TIMER(codegen->codegen_timer()); LLVMContext& context = codegen->context(); - // Codegen each compute function from materialize_expr_ctxs - Function* materialize_expr_fns[materialize_expr_ctxs.size()]; - for (int i = 0; i < materialize_expr_ctxs.size(); ++i) { - Status status = materialize_expr_ctxs[i]->root()->GetCodegendComputeFn(codegen, + // Codegen each compute function from slot_materialize_exprs + Function* materialize_expr_fns[slot_materialize_exprs.size()]; + for (int i = 0; i < slot_materialize_exprs.size(); ++i) { + Status status = slot_materialize_exprs[i]->GetCodegendComputeFn(codegen, &materialize_expr_fns[i]); if (!status.ok()) { stringstream ss; @@ -315,13 +315,13 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_ // Construct function signature (this must exactly match the actual signature since it's // used in xcompiled IR). With 'pool': // void MaterializeExprs(Tuple* tuple, TupleRow* row, TupleDescriptor* desc, - // ExprContext** materialize_expr_ctxs, MemPool* pool, + // ScalarExprEvaluator** slot_materialize_exprs, MemPool* pool, // StringValue** non_null_string_values, int* total_string_lengths) PointerType* opaque_tuple_type = codegen->GetPtrType(Tuple::LLVM_CLASS_NAME); PointerType* row_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME); PointerType* desc_type = codegen->GetPtrType(TupleDescriptor::LLVM_CLASS_NAME); - PointerType* expr_ctxs_type = - codegen->GetPtrType(codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME)); + PointerType* expr_evals_type = + codegen->GetPtrType(codegen->GetPtrType(ScalarExprEvaluator::LLVM_CLASS_NAME)); PointerType* pool_type = codegen->GetPtrType(MemPool::LLVM_CLASS_NAME); PointerType* string_values_type = codegen->GetPtrType(codegen->GetPtrType(StringValue::LLVM_CLASS_NAME)); @@ -330,7 +330,7 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_ prototype.AddArgument("opaque_tuple", opaque_tuple_type); prototype.AddArgument("row", row_type); prototype.AddArgument("desc", desc_type); - prototype.AddArgument("materialize_expr_ctxs", expr_ctxs_type); + prototype.AddArgument("slot_materialize_exprs", expr_evals_type); prototype.AddArgument("pool", pool_type); prototype.AddArgument("non_null_string_values", string_values_type); prototype.AddArgument("total_string_lengths", int_ptr_type); @@ -342,7 +342,7 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_ Value* opaque_tuple_arg = args[0]; Value* row_arg = args[1]; // Value* desc_arg = args[2]; // unused - Value* expr_ctxs_arg = args[3]; + Value* expr_evals_arg = args[3]; Value* pool_arg = args[4]; // The followings arguments are unused as 'collect_string_vals' is false. // Value* non_null_string_values_arg = args[5]; // unused @@ -360,18 +360,18 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_ // Clear tuple's null bytes codegen->CodegenClearNullBits(&builder, tuple, desc); - // Evaluate the materialize_expr_ctxs and place the results in the tuple. + // Evaluate the slot_materialize_exprs and place the results in the tuple. for (int i = 0; i < desc.slots().size(); ++i) { SlotDescriptor* slot_desc = desc.slots()[i]; DCHECK(slot_desc->type().type == TYPE_NULL || - slot_desc->type() == materialize_expr_ctxs[i]->root()->type()); + slot_desc->type() == slot_materialize_exprs[i]->type()); - // Call materialize_expr_fns[i](materialize_expr_ctxs[i], row) - Value* expr_ctx = codegen->CodegenArrayAt(&builder, expr_ctxs_arg, i, "expr_ctx"); - Value* expr_args[] = { expr_ctx, row_arg }; + // Call materialize_expr_fns[i](slot_materialize_exprs[i], row) + Value* expr_eval = + codegen->CodegenArrayAt(&builder, expr_evals_arg, i, "expr_eval"); + Value* expr_args[] = { expr_eval, row_arg }; CodegenAnyVal src = CodegenAnyVal::CreateCallWrapped(codegen, &builder, - materialize_expr_ctxs[i]->root()->type(), - materialize_expr_fns[i], expr_args, "src"); + slot_materialize_exprs[i]->type(), materialize_expr_fns[i], expr_args, "src"); // Write expr result 'src' to slot src.WriteToSlot(*slot_desc, tuple, use_mem_pool ? pool_arg : nullptr); @@ -384,11 +384,11 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_ } template void Tuple::MaterializeExprs<false, false>(TupleRow*, const TupleDescriptor&, - ExprContext* const*, MemPool*, StringValue**, int*, int*); + ScalarExprEvaluator* const*, MemPool*, StringValue**, int*, int*); template void Tuple::MaterializeExprs<false, true>(TupleRow*, const TupleDescriptor&, - ExprContext* const*, MemPool*, StringValue**, int*, int*); + ScalarExprEvaluator* const*, MemPool*, StringValue**, int*, int*); template void Tuple::MaterializeExprs<true, false>(TupleRow*, const TupleDescriptor&, - ExprContext* const*, MemPool*, StringValue**, int*, int*); + ScalarExprEvaluator* const*, MemPool*, StringValue**, int*, int*); template void Tuple::MaterializeExprs<true, true>(TupleRow*, const TupleDescriptor&, - ExprContext* const*, MemPool*, StringValue**, int*, int*); + ScalarExprEvaluator* const*, MemPool*, StringValue**, int*, int*); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/runtime/tuple.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h index 15b90a4..5503ab4 100644 --- a/be/src/runtime/tuple.h +++ b/be/src/runtime/tuple.h @@ -135,11 +135,11 @@ class Tuple { /// TODO: this function does not collect other var-len types such as collections. template <bool COLLECT_STRING_VALS, bool NULL_POOL> inline void IR_ALWAYS_INLINE MaterializeExprs(TupleRow* row, - const TupleDescriptor& desc, const std::vector<ExprContext*>& materialize_expr_ctxs, + const TupleDescriptor& desc, const std::vector<ScalarExprEvaluator*>& evals, MemPool* pool, std::vector<StringValue*>* non_null_string_values = NULL, int* total_string_lengths = NULL) { DCHECK_EQ(NULL_POOL, pool == NULL); - DCHECK_EQ(materialize_expr_ctxs.size(), desc.slots().size()); + DCHECK_EQ(evals.size(), desc.slots().size()); StringValue** non_null_string_values_array = NULL; int num_non_null_string_values = 0; if (COLLECT_STRING_VALS) { @@ -153,7 +153,7 @@ class Tuple { *total_string_lengths = 0; } MaterializeExprs<COLLECT_STRING_VALS, NULL_POOL>(row, desc, - materialize_expr_ctxs.data(), pool, non_null_string_values_array, + evals.data(), pool, non_null_string_values_array, total_string_lengths, &num_non_null_string_values); if (COLLECT_STRING_VALS) non_null_string_values->resize(num_non_null_string_values); } @@ -173,7 +173,7 @@ class Tuple { /// these cases when we replace the function calls during codegen. Please see comment /// of MaterializeExprs() for details. static Status CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_vals, - const TupleDescriptor& desc, const vector<ExprContext*>& materialize_expr_ctxs, + const TupleDescriptor& desc, const vector<ScalarExpr*>& slot_materialize_exprs, bool use_mem_pool, llvm::Function** fn); /// Turn null indicator bit on. For non-nullable slots, the mask will be 0 and @@ -247,7 +247,7 @@ class Tuple { /// codegen. 'num_non_null_string_values' must be initialized by the caller. template <bool COLLECT_STRING_VALS, bool NULL_POOL> void IR_NO_INLINE MaterializeExprs(TupleRow* row, const TupleDescriptor& desc, - ExprContext* const* materialize_expr_ctxs, MemPool* pool, + ScalarExprEvaluator* const* evals, MemPool* pool, StringValue** non_null_string_values, int* total_string_lengths, int* num_non_null_string_values); }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 6af0501..4fe7431 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -20,8 +20,6 @@ #include <limits> #include <gutil/strings/substitute.h> -#include "exprs/expr-context.h" -#include "exprs/expr.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/service/client-request-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index ea66022..66e206d 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -378,12 +378,6 @@ class ClientRequestState { Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows) WARN_UNUSED_RESULT; - /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in - /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'. - /// result and scales must have been resized to the number of columns before call. - Status GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales) - WARN_UNUSED_RESULT; - /// Gather and publish all required updates to the metastore Status UpdateCatalog() WARN_UNUSED_RESULT; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index d3681f9..44976ec 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -26,16 +26,19 @@ #include "common/logging.h" #include "common/status.h" #include "exec/catalog-op-executor.h" -#include "exprs/expr-context.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "gen-cpp/Data_types.h" #include "gen-cpp/Frontend_types.h" #include "rpc/jni-thrift-util.h" #include "rpc/thrift-server.h" #include "runtime/client-cache.h" +#include "runtime/decimal-value.inline.h" #include "runtime/exec-env.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/lib-cache.h" +#include "runtime/mem-pool.h" +#include "runtime/raw-value.h" #include "runtime/runtime-state.h" #include "service/impala-server.h" #include "util/cpu-info.h" @@ -71,6 +74,81 @@ Java_org_apache_impala_service_FeSupport_NativeFeTestInit( exec_env->InitForFeTests(); } +// Serializes expression value 'value' to thrift structure TColumnValue 'col_val'. +// 'type' indicates the type of the expression value. +static void SetTColumnValue( + const void* value, const ColumnType& type, TColumnValue* col_val) { + if (value == nullptr) return; + DCHECK(col_val != nullptr); + + string tmp; + switch (type.type) { + case TYPE_BOOLEAN: + col_val->__set_bool_val(*reinterpret_cast<const bool*>(value)); + break; + case TYPE_TINYINT: + col_val->__set_byte_val(*reinterpret_cast<const int8_t*>(value)); + break; + case TYPE_SMALLINT: + col_val->__set_short_val(*reinterpret_cast<const int16_t*>(value)); + break; + case TYPE_INT: + col_val->__set_int_val(*reinterpret_cast<const int32_t*>(value)); + break; + case TYPE_BIGINT: + col_val->__set_long_val(*reinterpret_cast<const int64_t*>(value)); + break; + case TYPE_FLOAT: + col_val->__set_double_val(*reinterpret_cast<const float*>(value)); + break; + case TYPE_DOUBLE: + col_val->__set_double_val(*reinterpret_cast<const double*>(value)); + break; + case TYPE_DECIMAL: + switch (type.GetByteSize()) { + case 4: + col_val->string_val = + reinterpret_cast<const Decimal4Value*>(value)->ToString(type); + break; + case 8: + col_val->string_val = + reinterpret_cast<const Decimal8Value*>(value)->ToString(type); + break; + case 16: + col_val->string_val = + reinterpret_cast<const Decimal16Value*>(value)->ToString(type); + break; + default: + DCHECK(false) << "Bad Type: " << type; + } + col_val->__isset.string_val = true; + break; + case TYPE_STRING: + case TYPE_VARCHAR: { + const StringValue* string_val = reinterpret_cast<const StringValue*>(value); + tmp.assign(static_cast<char*>(string_val->ptr), string_val->len); + col_val->binary_val.swap(tmp); + col_val->__isset.binary_val = true; + break; + } + case TYPE_CHAR: + tmp.assign(StringValue::CharSlotToPtr(value, type), type.len); + col_val->binary_val.swap(tmp); + col_val->__isset.binary_val = true; + break; + case TYPE_TIMESTAMP: { + const uint8_t* uint8_val = reinterpret_cast<const uint8_t*>(value); + col_val->binary_val.assign(uint8_val, uint8_val + type.GetSlotSize()); + col_val->__isset.binary_val = true; + RawValue::PrintValue(value, type, -1, &col_val->string_val); + col_val->__isset.string_val = true; + break; + } + default: + DCHECK(false) << "bad GetValue() type: " << type.DebugString(); + } +} + // Evaluates a batch of const exprs and returns the results in a serialized // TResultRow, where each TColumnValue in the TResultRow stores the result of // a predicate evaluation. It requires JniUtil::Init() to have been @@ -111,16 +189,20 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( THROW_IF_ERROR_RET( jni_frame.push(env), env, JniUtil::internal_exc_class(), result_bytes); + MemPool expr_mem_pool(state.query_mem_tracker()); + // Prepare() the exprs. Always Close() the exprs even in case of errors. - vector<ExprContext*> expr_ctxs; + vector<ScalarExpr*> exprs; + vector<ScalarExprEvaluator*> evals; for (const TExpr& texpr : texprs) { - ExprContext* ctx; - status = Expr::CreateExprTree(&obj_pool, texpr, &ctx); + ScalarExpr* expr; + status = ScalarExpr::Create(texpr, RowDescriptor(), &state, &expr); if (!status.ok()) goto error; - - // Add 'ctx' to vector so it will be closed if Prepare() fails. - expr_ctxs.push_back(ctx); - status = ctx->Prepare(&state, RowDescriptor(), state.query_mem_tracker()); + exprs.push_back(expr); + ScalarExprEvaluator* eval; + status = ScalarExprEvaluator::Create(*expr, &state, &obj_pool, &expr_mem_pool, + &eval); + evals.push_back(eval); if (!status.ok()) goto error; } @@ -138,14 +220,20 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( } // Open() and evaluate the exprs. Always Close() the exprs even in case of errors. - for (ExprContext* expr_ctx : expr_ctxs) { - status = expr_ctx->Open(&state); + for (int i = 0; i < evals.size(); ++i) { + ScalarExprEvaluator* eval = evals[i]; + status = eval->Open(&state); if (!status.ok()) goto error; - TColumnValue val; - expr_ctx->EvaluateWithoutRow(&val); - status = expr_ctx->root()->GetFnContextError(expr_ctx); + void* result = eval->GetValue(nullptr); + status = eval->GetError(); if (!status.ok()) goto error; + // 'output_scale' should only be set for MathFunctions::RoundUpTo() + // with return type double. + const ColumnType& type = eval->root().type(); + DCHECK(eval->output_scale() == -1 || type.type == TYPE_DOUBLE); + TColumnValue val; + SetTColumnValue(result, type, &val); // Check for mem limit exceeded. status = state.CheckQueryState(); @@ -156,11 +244,13 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( goto error; } - expr_ctx->Close(&state); + eval->Close(&state); + exprs[i]->Close(); results.push_back(val); } expr_results.__set_colVals(results); + expr_mem_pool.FreeAll(); status = SerializeThriftMsg(env, &expr_results, &result_bytes); if (!status.ok()) goto error; return result_bytes; @@ -168,7 +258,9 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( error: DCHECK(!status.ok()); // Convert status to exception. Close all remaining expr contexts. - for (ExprContext* expr_ctx : expr_ctxs) expr_ctx->Close(&state); + for (ScalarExprEvaluator* eval: evals) eval->Close(&state); + for (ScalarExpr* expr : exprs) expr->Close(); + expr_mem_pool.FreeAll(); (env)->ThrowNew(JniUtil::internal_exc_class(), status.GetDetail().c_str()); return result_bytes; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 1bb2e8a..2528349 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -33,7 +33,6 @@ #include "common/logging.h" #include "common/version.h" -#include "exprs/expr.h" #include "rpc/thrift-util.h" #include "runtime/raw-value.h" #include "runtime/exec-env.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/service/impalad-main.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc index 805e0f2..3acce47 100644 --- a/be/src/service/impalad-main.cc +++ b/be/src/service/impalad-main.cc @@ -67,7 +67,7 @@ int ImpaladMain(int argc, char** argv) { ABORT_IF_ERROR(HBaseTableScanner::Init()); ABORT_IF_ERROR(HBaseTable::InitJNI()); ABORT_IF_ERROR(HBaseTableWriter::InitJNI()); - ABORT_IF_ERROR(HiveUdfCall::Init()); + ABORT_IF_ERROR(HiveUdfCall::InitEnv()); InitFeSupport(); if (FLAGS_enable_rm) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/testutil/desc-tbl-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc index b4b0a1f..77be724 100644 --- a/be/src/testutil/desc-tbl-builder.cc +++ b/be/src/testutil/desc-tbl-builder.cc @@ -61,7 +61,7 @@ DescriptorTbl* DescriptorTblBuilder::Build() { DCHECK(buildDescTblStatus.ok()) << buildDescTblStatus.GetDetail(); DescriptorTbl* desc_tbl; - Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, nullptr, &desc_tbl); + Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, &desc_tbl); DCHECK(status.ok()) << status.GetDetail(); return desc_tbl; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/udf/udf-internal.h ---------------------------------------------------------------------- diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h index dc8b0b5..0d75d81 100644 --- a/be/src/udf/udf-internal.h +++ b/be/src/udf/udf-internal.h @@ -39,10 +39,10 @@ namespace impala { } \ } while (false) -class Expr; class FreePool; class MemPool; class RuntimeState; +class ScalarExpr; /// This class actually implements the interface of FunctionContext. This is split to /// hide the details from the external header. @@ -51,8 +51,8 @@ class RuntimeState; /// Exprs (e.g. UDFs and UDAs) require a FunctionContext to store state related to /// evaluation of the expression. Each FunctionContext is associated with a backend Expr /// or AggFnEvaluator, which is derived from a TExprNode generated by the Impala frontend. -/// FunctionContexts are allocated and managed by ExprContext. Exprs shouldn't try to -/// create FunctionContext themselves. +/// FunctionContexts are allocated and managed by ScalarExprEvaluator. Exprs shouldn't try +/// to create FunctionContext themselves. class FunctionContextImpl { public: /// Create a FunctionContext for a UDF. Caller is responsible for deleting it. @@ -105,19 +105,18 @@ class FunctionContextImpl { /// Sets the constant arg list. The vector should contain one entry per argument, /// with a non-NULL entry if the argument is constant. The AnyVal* values are - /// owned by the caller and must be allocated from the ExprContext's MemPool. + /// owned by the caller and must be allocated from the ScalarExprEvaluator's MemPool. void SetConstantArgs(std::vector<impala_udf::AnyVal*>&& constant_args); + typedef std::vector<std::pair<ScalarExpr*, impala_udf::AnyVal*>> NonConstantArgsVector; + /// Sets the non-constant args. Contains one entry per non-constant argument. All /// pointers should be non-NULL. The Expr* and AnyVal* values are owned by the caller. - /// The AnyVal* values must be allocated from the ExprContext's MemPool. - void SetNonConstantArgs( - std::vector<std::pair<Expr*, impala_udf::AnyVal*>>&& non_constant_args); + /// The AnyVal* values must be allocated from the ScalarExprEvaluator's MemPool. + void SetNonConstantArgs(NonConstantArgsVector&& non_constant_args); const std::vector<impala_udf::AnyVal*>& constant_args() const { return constant_args_; } - const std::vector<std::pair<Expr*, impala_udf::AnyVal*>>& non_constant_args() const { - return non_constant_args_; - } + const NonConstantArgsVector& non_constant_args() const { return non_constant_args_; } uint8_t* varargs_buffer() { return varargs_buffer_; } @@ -187,7 +186,7 @@ class FunctionContextImpl { private: friend class impala_udf::FunctionContext; - friend class ExprContext; + friend class ScalarExprEvaluator; /// A utility function which checks for memory limits and null pointers returned by /// Allocate(), Reallocate() and AllocateLocal() and sets the appropriate error status @@ -260,13 +259,13 @@ class FunctionContextImpl { /// Contains an AnyVal* for each argument of the function. If the AnyVal* is NULL, /// indicates that the corresponding argument is non-constant. Otherwise contains the /// value of the argument. The AnyVal* objects and associated data are owned by the - /// ExprContext provided when opening the FRAGMENT_LOCAL expression contexts. + /// ScalarExprEvaluator provided when opening the FRAGMENT_LOCAL expression contexts. std::vector<impala_udf::AnyVal*> constant_args_; /// Vector of all non-constant children expressions that need to be evaluated for /// each input row. The first element of each pair is the child expression and the /// second element is the value it must be evaluated into. - std::vector<std::pair<Expr*, impala_udf::AnyVal*>> non_constant_args_; + NonConstantArgsVector non_constant_args_; /// Used by ScalarFnCall to temporarily store arguments for a UDF when running without /// codegen. Allows us to pass AnyVal* arguments to the scalar function directly, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/udf/udf.cc ---------------------------------------------------------------------- diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc index a013638..212fd0c 100644 --- a/be/src/udf/udf.cc +++ b/be/src/udf/udf.cc @@ -413,7 +413,6 @@ bool FunctionContext::AddWarning(const char* warning_msg) { // In case of the SDK build, we simply, forward this call to a dummy method return impl_->state_->LogError(ss.str()); #endif - } else { cerr << ss.str() << endl; return true; @@ -494,8 +493,7 @@ void FunctionContextImpl::SetConstantArgs(vector<AnyVal*>&& constant_args) { constant_args_ = constant_args; } -void FunctionContextImpl::SetNonConstantArgs( - vector<pair<Expr*, AnyVal*>>&& non_constant_args) { +void FunctionContextImpl::SetNonConstantArgs(NonConstantArgsVector&& non_constant_args) { non_constant_args_ = non_constant_args; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/udf/udf.h ---------------------------------------------------------------------- diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index 725abe8..abec935 100644 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -108,7 +108,7 @@ class FunctionContext { /// concurrently on a single host if the UDF will be evaluated in multiple plan /// fragments on that host. In general, read-only state that doesn't need to be /// recomputed for every UDF call should be fragment-local. - /// TODO: not yet implemented + /// TODO: Move FRAGMENT_LOCAL states to query_state for multi-threading. FRAGMENT_LOCAL, /// Indicates that the function state is local to the execution thread. This state http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/util/tuple-row-compare.cc ---------------------------------------------------------------------- diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc index c159853..1784893 100644 --- a/be/src/util/tuple-row-compare.cc +++ b/be/src/util/tuple-row-compare.cc @@ -21,6 +21,8 @@ #include "codegen/codegen-anyval.h" #include "codegen/llvm-codegen.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/runtime-state.h" #include "util/runtime-profile-counters.h" @@ -28,20 +30,41 @@ using namespace impala; using namespace llvm; using namespace strings; +Status TupleRowComparator::Open( + ObjectPool* pool, RuntimeState* state, MemPool* expr_mem_pool) { + if (ordering_expr_evals_lhs_.empty()) { + RETURN_IF_ERROR(ScalarExprEvaluator::Create(ordering_exprs_, state, pool, + expr_mem_pool, &ordering_expr_evals_lhs_)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(ordering_expr_evals_lhs_, state)); + } + DCHECK_EQ(ordering_exprs_.size(), ordering_expr_evals_lhs_.size()); + if (ordering_expr_evals_rhs_.empty()) { + RETURN_IF_ERROR(ScalarExprEvaluator::Clone(pool, state, expr_mem_pool, + ordering_expr_evals_lhs_, &ordering_expr_evals_rhs_)); + } + DCHECK_EQ(ordering_expr_evals_lhs_.size(), ordering_expr_evals_rhs_.size()); + return Status::OK(); +} + +void TupleRowComparator::Close(RuntimeState* state) { + ScalarExprEvaluator::Close(ordering_expr_evals_rhs_, state); + ScalarExprEvaluator::Close(ordering_expr_evals_lhs_, state); +} + int TupleRowComparator::CompareInterpreted( const TupleRow* lhs, const TupleRow* rhs) const { - DCHECK_EQ(key_expr_ctxs_lhs_.size(), key_expr_ctxs_rhs_.size()); - for (int i = 0; i < key_expr_ctxs_lhs_.size(); ++i) { - void* lhs_value = key_expr_ctxs_lhs_[i]->GetValue(lhs); - void* rhs_value = key_expr_ctxs_rhs_[i]->GetValue(rhs); + DCHECK_EQ(ordering_exprs_.size(), ordering_expr_evals_lhs_.size()); + DCHECK_EQ(ordering_expr_evals_lhs_.size(), ordering_expr_evals_rhs_.size()); + for (int i = 0; i < ordering_expr_evals_lhs_.size(); ++i) { + void* lhs_value = ordering_expr_evals_lhs_[i]->GetValue(lhs); + void* rhs_value = ordering_expr_evals_rhs_[i]->GetValue(rhs); // The sort order of NULLs is independent of asc/desc. if (lhs_value == NULL && rhs_value == NULL) continue; if (lhs_value == NULL && rhs_value != NULL) return nulls_first_[i]; if (lhs_value != NULL && rhs_value == NULL) return -nulls_first_[i]; - int result = - RawValue::Compare(lhs_value, rhs_value, key_expr_ctxs_lhs_[i]->root()->type()); + int result = RawValue::Compare(lhs_value, rhs_value, ordering_exprs_[i]->type()); if (!is_asc_[i]) result = -result; if (result != 0) return result; // Otherwise, try the next Expr @@ -65,8 +88,10 @@ Status TupleRowComparator::Codegen(RuntimeState* state) { // Example IR for comparing an int column then a float column: // // ; Function Attrs: alwaysinline -// define i32 @Compare(%"class.impala::ExprContext"** %key_expr_ctxs_lhs, -// %"class.impala::ExprContext"** %key_expr_ctxs_rhs, +// define i32 @Compare(%"class.impala::ScalarExprEvaluator"** +// %ordering_expr_evals_lhs, +// %"class.impala::ScalarExprEvaluator"** +// %ordering_expr_evals_rhs, // %"class.impala::TupleRow"* %lhs, // %"class.impala::TupleRow"* %rhs) #20 { // entry: @@ -76,14 +101,16 @@ Status TupleRowComparator::Codegen(RuntimeState* state) { // %type = alloca %"struct.impala::ColumnType" // %2 = alloca i32 // %3 = alloca i32 -// %4 = getelementptr %"class.impala::ExprContext"** %key_expr_ctxs_lhs, i32 0 -// %5 = load %"class.impala::ExprContext"** %4 +// %4 = getelementptr %"class.impala::ScalarExprEvaluator"** +// %ordering_expr_evals_lhs, i32 0 +// %5 = load %"class.impala::ScalarExprEvaluator"** %4 // %lhs_value = call i64 @GetSlotRef( -// %"class.impala::ExprContext"* %5, %"class.impala::TupleRow"* %lhs) -// %6 = getelementptr %"class.impala::ExprContext"** %key_expr_ctxs_rhs, i32 0 -// %7 = load %"class.impala::ExprContext"** %6 +// %"class.impala::ScalarExprEvaluator"* %5, %"class.impala::TupleRow"* %lhs) +// %6 = getelementptr %"class.impala::ScalarExprEvaluator"** +// %ordering_expr_evals_rhs, i32 0 +// %7 = load %"class.impala::ScalarExprEvaluator"** %6 // %rhs_value = call i64 @GetSlotRef( -// %"class.impala::ExprContext"* %7, %"class.impala::TupleRow"* %rhs) +// %"class.impala::ScalarExprEvaluator"* %7, %"class.impala::TupleRow"* %rhs) // %is_null = trunc i64 %lhs_value to i1 // %is_null1 = trunc i64 %rhs_value to i1 // %both_null = and i1 %is_null, %is_null1 @@ -123,14 +150,16 @@ Status TupleRowComparator::Codegen(RuntimeState* state) { // ret i32 %result // // next_key: ; preds = %rhs_non_null, %entry -// %15 = getelementptr %"class.impala::ExprContext"** %key_expr_ctxs_lhs, i32 1 -// %16 = load %"class.impala::ExprContext"** %15 +// %15 = getelementptr %"class.impala::ScalarExprEvaluator"** +// %ordering_expr_evals_lhs, i32 1 +// %16 = load %"class.impala::ScalarExprEvaluator"** %15 // %lhs_value3 = call i64 @GetSlotRef1( -// %"class.impala::ExprContext"* %16, %"class.impala::TupleRow"* %lhs) -// %17 = getelementptr %"class.impala::ExprContext"** %key_expr_ctxs_rhs, i32 1 -// %18 = load %"class.impala::ExprContext"** %17 +// %"class.impala::ScalarExprEvaluator"* %16, %"class.impala::TupleRow"* %lhs) +// %17 = getelementptr %"class.impala::ScalarExprEvaluator"** +// %ordering_expr_evals_rhs, i32 1 +// %18 = load %"class.impala::ScalarExprEvaluator"** %17 // %rhs_value4 = call i64 @GetSlotRef1( -// %"class.impala::ExprContext"* %18, %"class.impala::TupleRow"* %rhs) +// %"class.impala::ScalarExprEvaluator"* %18, %"class.impala::TupleRow"* %rhs) // %is_null5 = trunc i64 %lhs_value3 to i1 // %is_null6 = trunc i64 %rhs_value4 to i1 // %both_null8 = and i1 %is_null5, %is_null6 @@ -177,15 +206,10 @@ Status TupleRowComparator::Codegen(RuntimeState* state) { Status TupleRowComparator::CodegenCompare(LlvmCodeGen* codegen, Function** fn) { SCOPED_TIMER(codegen->codegen_timer()); LLVMContext& context = codegen->context(); - - // Get all the key compute functions from key_expr_ctxs_lhs_. The lhs and rhs functions - // are the same since they're clones, and key_expr_ctxs_rhs_ is not populated until - // Open() is called. - DCHECK(key_expr_ctxs_rhs_.empty()) << "rhs exprs should be clones of lhs!"; - Function* key_fns[key_expr_ctxs_lhs_.size()]; - for (int i = 0; i < key_expr_ctxs_lhs_.size(); ++i) { - Status status = - key_expr_ctxs_lhs_[i]->root()->GetCodegendComputeFn(codegen, &key_fns[i]); + const vector<ScalarExpr*>& ordering_exprs = ordering_exprs_; + Function* key_fns[ordering_exprs.size()]; + for (int i = 0; i < ordering_exprs.size(); ++i) { + Status status = ordering_exprs[i]->GetCodegendComputeFn(codegen, &key_fns[i]); if (!status.ok()) { return Status(Substitute("Could not codegen TupleRowComparator::Compare(): $0", status.GetDetail())); @@ -194,42 +218,43 @@ Status TupleRowComparator::CodegenCompare(LlvmCodeGen* codegen, Function** fn) { // Construct function signature (note that this is different than the interpreted // Compare() function signature): - // int Compare(ExprContext** key_expr_ctxs_lhs, ExprContext** key_expr_ctxs_rhs, + // int Compare(ScalarExprEvaluator** ordering_expr_evals_lhs, + // ScalarExprEvaluator** ordering_expr_evals_rhs, // TupleRow* lhs, TupleRow* rhs) - PointerType* expr_ctxs_type = - codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME)->getPointerTo(); + PointerType* expr_evals_type = + codegen->GetPtrPtrType(ScalarExprEvaluator::LLVM_CLASS_NAME); PointerType* tuple_row_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME); LlvmCodeGen::FnPrototype prototype(codegen, "Compare", codegen->int_type()); - prototype.AddArgument("key_expr_ctxs_lhs", expr_ctxs_type); - prototype.AddArgument("key_expr_ctxs_rhs", expr_ctxs_type); + prototype.AddArgument("ordering_expr_evals_lhs", expr_evals_type); + prototype.AddArgument("ordering_expr_evals_rhs", expr_evals_type); prototype.AddArgument("lhs", tuple_row_type); prototype.AddArgument("rhs", tuple_row_type); LlvmBuilder builder(context); Value* args[4]; *fn = prototype.GeneratePrototype(&builder, args); - Value* lhs_ctxs_arg = args[0]; - Value* rhs_ctxs_arg = args[1]; + Value* lhs_evals_arg = args[0]; + Value* rhs_evals_arg = args[1]; Value* lhs_arg = args[2]; Value* rhs_arg = args[3]; // Unrolled loop over each key expr - for (int i = 0; i < key_expr_ctxs_lhs_.size(); ++i) { + for (int i = 0; i < ordering_exprs.size(); ++i) { // The start of the next key expr after this one. Used to implement "continue" logic // in the unrolled loop. BasicBlock* next_key_block = BasicBlock::Create(context, "next_key", *fn); - // Call key_fns[i](key_expr_ctxs_lhs[i], lhs_arg) - Value* lhs_ctx = codegen->CodegenArrayAt(&builder, lhs_ctxs_arg, i); - Value* lhs_args[] = { lhs_ctx, lhs_arg }; + // Call key_fns[i](ordering_expr_evals_lhs[i], lhs_arg) + Value* lhs_eval = codegen->CodegenArrayAt(&builder, lhs_evals_arg, i); + Value* lhs_args[] = { lhs_eval, lhs_arg }; CodegenAnyVal lhs_value = CodegenAnyVal::CreateCallWrapped(codegen, &builder, - key_expr_ctxs_lhs_[i]->root()->type(), key_fns[i], lhs_args, "lhs_value"); + ordering_exprs[i]->type(), key_fns[i], lhs_args, "lhs_value"); - // Call key_fns[i](key_expr_ctxs_rhs[i], rhs_arg) - Value* rhs_ctx = codegen->CodegenArrayAt(&builder, rhs_ctxs_arg, i); - Value* rhs_args[] = { rhs_ctx, rhs_arg }; + // Call key_fns[i](ordering_expr_evals_rhs[i], rhs_arg) + Value* rhs_eval = codegen->CodegenArrayAt(&builder, rhs_evals_arg, i); + Value* rhs_args[] = { rhs_eval, rhs_arg }; CodegenAnyVal rhs_value = CodegenAnyVal::CreateCallWrapped(codegen, &builder, - key_expr_ctxs_lhs_[i]->root()->type(), key_fns[i], rhs_args, "rhs_value"); + ordering_exprs[i]->type(), key_fns[i], rhs_args, "rhs_value"); // Handle NULLs if necessary Value* lhs_null = lhs_value.GetIsNull(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/util/tuple-row-compare.h ---------------------------------------------------------------------- diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h index e2beed1..2db5604 100644 --- a/be/src/util/tuple-row-compare.h +++ b/be/src/util/tuple-row-compare.h @@ -20,9 +20,8 @@ #define IMPALA_UTIL_TUPLE_ROW_COMPARE_H_ #include "common/compiler-util.h" -#include "exec/sort-exec-exprs.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/descriptors.h" #include "runtime/raw-value.h" #include "runtime/raw-value.inline.h" @@ -31,6 +30,9 @@ namespace impala { +class RuntimeState; +class ScalarExprEvaluator; + /// A wrapper around types Comparator with a Less() method. This wrapper allows the use of /// type Comparator with STL containers which expect a type like std::less<T>, which uses /// operator() instead of Less() and is cheap to copy. @@ -58,50 +60,45 @@ class ComparatorWrapper { /// Compares two TupleRows based on a set of exprs, in order. class TupleRowComparator { public: - /// 'sort_key_exprs' must have already been prepared. + /// 'ordering_exprs': the ordering expressions for tuple comparison. /// 'is_asc' determines, for each expr, if it should be ascending or descending sort /// order. /// 'nulls_first' determines, for each expr, if nulls should come before or after all /// other values. - TupleRowComparator( - const SortExecExprs& sort_key_exprs, - const std::vector<bool>& is_asc, - const std::vector<bool>& nulls_first) - : key_expr_ctxs_lhs_(sort_key_exprs.lhs_ordering_expr_ctxs()), - key_expr_ctxs_rhs_(sort_key_exprs.rhs_ordering_expr_ctxs()), - is_asc_(is_asc), - codegend_compare_fn_(NULL) { - DCHECK_EQ(key_expr_ctxs_lhs_.size(), is_asc.size()); - DCHECK_EQ(key_expr_ctxs_lhs_.size(), nulls_first.size()); - nulls_first_.reserve(key_expr_ctxs_lhs_.size()); - for (int i = 0; i < key_expr_ctxs_lhs_.size(); ++i) { - nulls_first_.push_back(nulls_first[i] ? -1 : 1); - } + TupleRowComparator(const std::vector<ScalarExpr*>& ordering_exprs, + const std::vector<bool>& is_asc, const std::vector<bool>& nulls_first) + : ordering_exprs_(ordering_exprs), + is_asc_(is_asc), + codegend_compare_fn_(nullptr) { + DCHECK_EQ(is_asc_.size(), ordering_exprs.size()); + for (bool null_first : nulls_first) nulls_first_.push_back(null_first ? -1 : 1); } - TupleRowComparator(const SortExecExprs& sort_key_exprs, bool is_asc, bool nulls_first) - : key_expr_ctxs_lhs_(sort_key_exprs.lhs_ordering_expr_ctxs()), - key_expr_ctxs_rhs_(sort_key_exprs.rhs_ordering_expr_ctxs()), - is_asc_(key_expr_ctxs_lhs_.size(), is_asc), - nulls_first_(key_expr_ctxs_lhs_.size(), nulls_first ? -1 : 1), - codegend_compare_fn_(NULL) {} + /// Create the evaluators for the ordering expressions and store them in 'pool'. Any + /// allocation during initialization of the evaluators will come from 'expr_mem_pool'. + /// 'state' is passed in for initialization of the evaluator. + Status Open(ObjectPool* pool, RuntimeState* state, MemPool* expr_mem_pool); + + /// Release resources held by the ordering expressions' evaluators. + void Close(RuntimeState* state); /// Codegens a Compare() function for this comparator that is used in Compare(). Status Codegen(RuntimeState* state); - /// Returns a negative value if lhs is less than rhs, a positive value if lhs is greater - /// than rhs, or 0 if they are equal. All exprs (key_exprs_lhs_ and key_exprs_rhs_) must - /// have been prepared and opened before calling this, i.e. 'sort_key_exprs' in the - /// constructor must have been opened. + /// Returns a negative value if lhs is less than rhs, a positive value if lhs is + /// greater than rhs, or 0 if they are equal. All exprs (ordering_exprs_lhs_ and + /// ordering_exprs_rhs_) must have been prepared and opened before calling this, + /// i.e. 'sort_key_exprs' in the constructor must have been opened. int ALWAYS_INLINE Compare(const TupleRow* lhs, const TupleRow* rhs) const { return codegend_compare_fn_ == NULL ? CompareInterpreted(lhs, rhs) : - (*codegend_compare_fn_)(&key_expr_ctxs_lhs_[0], &key_expr_ctxs_rhs_[0], lhs, rhs); + (*codegend_compare_fn_)(ordering_expr_evals_lhs_.data(), + ordering_expr_evals_rhs_.data(), lhs, rhs); } /// Returns true if lhs is strictly less than rhs. - /// All exprs (key_exprs_lhs_ and key_exprs_rhs_) must have been prepared and opened - /// before calling this. + /// All exprs (ordering_exprs_lhs_ and ordering_exprs_rhs_) must have been prepared + /// and opened before calling this. /// Force inlining because it tends not to be always inlined at callsites, even in /// hot loops. bool ALWAYS_INLINE Less(const TupleRow* lhs, const TupleRow* rhs) const { @@ -116,8 +113,8 @@ class TupleRowComparator { /// Free any local allocations made during expression evaluations in Compare(). void FreeLocalAllocations() const { - ExprContext::FreeLocalAllocations(key_expr_ctxs_lhs_); - ExprContext::FreeLocalAllocations(key_expr_ctxs_rhs_); + ScalarExprEvaluator::FreeLocalAllocations(ordering_expr_evals_lhs_); + ScalarExprEvaluator::FreeLocalAllocations(ordering_expr_evals_rhs_); } private: @@ -129,14 +126,16 @@ class TupleRowComparator { /// pointer. Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn); - /// References to ExprContexts managed by SortExecExprs. The lhs ExprContexts must - /// be created and prepared before the TupleRowCompator is constructed, but the rhs - /// ExprContexts are only created via cloning when SortExecExprs is Open()ed (which - /// may be after the TupleRowComparator is constructed). - const std::vector<ExprContext*>& key_expr_ctxs_lhs_; - const std::vector<ExprContext*>& key_expr_ctxs_rhs_; + /// References to ordering expressions owned by the Exec node which owns this + /// TupleRowComparator. + const std::vector<ScalarExpr*>& ordering_exprs_; + + /// The evaluators for the LHS and RHS ordering expressions. The RHS evaluator is + /// created via cloning the evaluator after it has been Opened(). + std::vector<ScalarExprEvaluator*> ordering_expr_evals_lhs_; + std::vector<ScalarExprEvaluator*> ordering_expr_evals_rhs_; - std::vector<bool> is_asc_; + const std::vector<bool>& is_asc_; std::vector<int8_t> nulls_first_; /// We store a pointer to the codegen'd function pointer (adding an extra level of @@ -146,8 +145,8 @@ class TupleRowComparator { /// TupleRowComparator is copied before the module is compiled, the copy will still have /// its function pointer set to NULL. The function pointer is allocated from the runtime /// state's object pool so that its lifetime will be >= that of any copies. - typedef int (*CompareFn)(ExprContext* const*, ExprContext* const*, const TupleRow*, - const TupleRow*); + typedef int (*CompareFn)(ScalarExprEvaluator* const*, ScalarExprEvaluator* const*, + const TupleRow*, const TupleRow*); CompareFn* codegend_compare_fn_; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/common/thrift/CatalogObjects.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 45f103c..7894f75 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -244,6 +244,7 @@ struct THdfsPartition { 4: required byte mapKeyDelim 5: required byte escapeChar 6: required THdfsFileFormat fileFormat + // These are Literal expressions 7: list<Exprs.TExpr> partitionKeyExprs 8: required i32 blockSize 9: optional list<THdfsFileDesc> file_desc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/common/thrift/Types.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index 30d168d..b271dd9 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -185,15 +185,15 @@ struct TScalarFunction { struct TAggregateFunction { 1: required TColumnType intermediate_type - 2: required string update_fn_symbol - 3: required string init_fn_symbol - 4: optional string serialize_fn_symbol - 5: optional string merge_fn_symbol - 6: optional string finalize_fn_symbol + 2: required bool is_analytic_only_fn + 3: required string update_fn_symbol + 4: required string init_fn_symbol + 5: optional string serialize_fn_symbol + 6: optional string merge_fn_symbol + 7: optional string finalize_fn_symbol 8: optional string get_value_fn_symbol 9: optional string remove_fn_symbol - - 7: optional bool ignores_distinct + 10: optional bool ignores_distinct } // Represents a function in the Catalog. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index bccb779..80e054e 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -323,6 +323,9 @@ error_codes = ( # TODO: IMPALA-3200: make sure that this references the correct query option. ("MAX_ROW_SIZE", 104, "Row of size $0 could not be materialized in plan node with " "id $1. Limit is $2, which can be increased with query option max_row_size"), + + ("IR_VERIFY_FAILED", 105, + "Failed to verify generated IR function $0, see log for more details."), ) import sys http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java b/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java index bd87f80..d944fb4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java +++ b/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java @@ -25,6 +25,7 @@ import org.apache.impala.analysis.HdfsUri; import org.apache.impala.thrift.TAggregateFunction; import org.apache.impala.thrift.TFunction; import org.apache.impala.thrift.TFunctionBinaryType; +import com.google.common.base.Preconditions; /** * Internal representation of an aggregate function. @@ -123,6 +124,9 @@ public class AggregateFunction extends Function { String serializeFnSymbol, String getValueFnSymbol, String removeFnSymbol, String finalizeFnSymbol, boolean ignoresDistinct, boolean isAnalyticFn, boolean returnsNonNullOnEmpty) { + Preconditions.checkState(initFnSymbol != null); + Preconditions.checkState(updateFnSymbol != null); + Preconditions.checkState(mergeFnSymbol != null); AggregateFunction fn = new AggregateFunction(new FunctionName(db.getName(), name), argTypes, retType, intermediateType, null, updateFnSymbol, initFnSymbol, serializeFnSymbol, mergeFnSymbol, getValueFnSymbol, removeFnSymbol, @@ -216,6 +220,7 @@ public class AggregateFunction extends Function { public TFunction toThrift() { TFunction fn = super.toThrift(); TAggregateFunction agg_fn = new TAggregateFunction(); + agg_fn.setIs_analytic_only_fn(isAnalyticFn_ && !isAggregateFn_); agg_fn.setUpdate_fn_symbol(updateFnSymbol_); agg_fn.setInit_fn_symbol(initFnSymbol_); if (serializeFnSymbol_ != null) agg_fn.setSerialize_fn_symbol(serializeFnSymbol_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/fe/src/main/java/org/apache/impala/planner/SortNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java index 177565a..c06d320 100644 --- a/fe/src/main/java/org/apache/impala/planner/SortNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java @@ -161,7 +161,7 @@ public class SortNode extends PlanNode { info_.getIsAscOrder(), info_.getNullsFirst()); Preconditions.checkState(tupleIds_.size() == 1, "Incorrect size for tupleIds_ in SortNode"); - sort_info.sort_tuple_slot_exprs = Expr.treesToThrift(resolvedTupleExprs_); + sort_info.setSort_tuple_slot_exprs(Expr.treesToThrift(resolvedTupleExprs_)); TSortNode sort_node = new TSortNode(sort_info, useTopN_); sort_node.setOffset(offset_); msg.sort_node = sort_node;
