http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/analytic-eval-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc index 083ab1a..2aabad1 100644 --- a/be/src/exec/analytic-eval-node.cc +++ b/be/src/exec/analytic-eval-node.cc @@ -19,7 +19,10 @@ #include <gutil/strings/substitute.h> +#include "exprs/agg-fn.h" #include "exprs/agg-fn-evaluator.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/buffered-tuple-stream.inline.h" #include "runtime/descriptors.h" #include "runtime/mem-tracker.h" @@ -44,25 +47,21 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, descs.GetTupleDescriptor(tnode.analytic_node.intermediate_tuple_id)), result_tuple_desc_( descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)), - buffered_tuple_desc_(NULL), - partition_by_eq_expr_ctx_(NULL), - order_by_eq_expr_ctx_(NULL), rows_start_offset_(0), rows_end_offset_(0), has_first_val_null_offset_(false), first_val_null_offset_(0), - client_(NULL), - child_tuple_cmp_row_(NULL), + client_(nullptr), + child_tuple_cmp_row_(nullptr), last_result_idx_(-1), prev_pool_last_result_idx_(-1), prev_pool_last_window_idx_(-1), - curr_tuple_(NULL), - dummy_result_tuple_(NULL), + curr_tuple_(nullptr), + dummy_result_tuple_(nullptr), curr_partition_idx_(-1), - prev_input_row_(NULL), - input_stream_(NULL), + input_stream_(nullptr), input_eos_(false), - evaluation_timer_(NULL) { + evaluation_timer_(nullptr) { if (tnode.analytic_node.__isset.buffered_tuple_id) { buffered_tuple_desc_ = descs.GetTupleDescriptor( tnode.analytic_node.buffered_tuple_id); @@ -105,36 +104,48 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, AnalyticEvalNode::~AnalyticEvalNode() { // Check that we didn't leak any memory. - DCHECK(input_stream_ == NULL || input_stream_->is_closed()); + DCHECK(input_stream_ == nullptr || input_stream_->is_closed()); } Status AnalyticEvalNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Init(tnode, state)); - DCHECK_EQ(conjunct_ctxs_.size(), 0); + DCHECK_EQ(conjunct_evals_.size(), 0); const TAnalyticNode& analytic_node = tnode.analytic_node; bool has_lead_fn = false; + for (int i = 0; i < analytic_node.analytic_functions.size(); ++i) { - AggFnEvaluator* evaluator; - RETURN_IF_ERROR(AggFnEvaluator::Create( - pool_, analytic_node.analytic_functions[i], true, &evaluator)); - evaluators_.push_back(evaluator); + AggFn* analytic_fn; + RETURN_IF_ERROR(AggFn::Create(analytic_node.analytic_functions[i], + child(0)->row_desc(), *(intermediate_tuple_desc_->slots()[i]), + *(result_tuple_desc_->slots()[i]), state, &analytic_fn)); + analytic_fns_.push_back(analytic_fn); + DCHECK(!analytic_fn->is_merge()); const TFunction& fn = analytic_node.analytic_functions[i].nodes[0].fn; - is_lead_fn_.push_back("lead" == fn.name.function_name); - has_lead_fn = has_lead_fn || is_lead_fn_.back(); + const bool is_lead_fn = fn.name.function_name == "lead"; + is_lead_fn_.push_back(is_lead_fn); + has_lead_fn |= is_lead_fn; } DCHECK(!has_lead_fn || !window_.__isset.window_start); DCHECK(fn_scope_ != PARTITION || analytic_node.order_by_exprs.empty()); DCHECK(window_.__isset.window_end || !window_.__isset.window_start) << "UNBOUNDED FOLLOWING is only supported with UNBOUNDED PRECEDING."; - if (analytic_node.__isset.partition_by_eq) { - DCHECK(analytic_node.__isset.buffered_tuple_id); - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.partition_by_eq, - &partition_by_eq_expr_ctx_)); - } - if (analytic_node.__isset.order_by_eq) { + + if (analytic_node.__isset.partition_by_eq || analytic_node.__isset.order_by_eq) { DCHECK(analytic_node.__isset.buffered_tuple_id); - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.order_by_eq, - &order_by_eq_expr_ctx_)); + DCHECK(buffered_tuple_desc_ != nullptr); + vector<TTupleId> tuple_ids; + tuple_ids.push_back(child(0)->row_desc().tuple_descriptors()[0]->id()); + tuple_ids.push_back(buffered_tuple_desc_->id()); + RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false)); + + if (analytic_node.__isset.partition_by_eq) { + RETURN_IF_ERROR(ScalarExpr::Create(analytic_node.partition_by_eq, cmp_row_desc, + state, &partition_by_eq_expr_)); + } + if (analytic_node.__isset.order_by_eq) { + RETURN_IF_ERROR(ScalarExpr::Create(analytic_node.order_by_eq, cmp_row_desc, + state, &order_by_eq_expr_)); + } } return Status::OK(); } @@ -146,35 +157,23 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) { curr_tuple_pool_.reset(new MemPool(mem_tracker())); prev_tuple_pool_.reset(new MemPool(mem_tracker())); mem_pool_.reset(new MemPool(mem_tracker())); - fn_pool_.reset(new MemPool(mem_tracker())); + fn_pool_.reset(new MemPool(expr_mem_tracker())); evaluation_timer_ = ADD_TIMER(runtime_profile(), "EvaluationTime"); - DCHECK_EQ(result_tuple_desc_->slots().size(), evaluators_.size()); - for (int i = 0; i < evaluators_.size(); ++i) { - impala_udf::FunctionContext* ctx; - RETURN_IF_ERROR(evaluators_[i]->Prepare(state, child(0)->row_desc(), - intermediate_tuple_desc_->slots()[i], result_tuple_desc_->slots()[i], - fn_pool_.get(), &ctx)); - fn_ctxs_.push_back(ctx); - state->obj_pool()->Add(ctx); + DCHECK_EQ(result_tuple_desc_->slots().size(), analytic_fns_.size()); + RETURN_IF_ERROR(AggFnEvaluator::Create(analytic_fns_, state, pool_, fn_pool_.get(), + &analytic_fn_evals_)); + + if (partition_by_eq_expr_ != nullptr) { + RETURN_IF_ERROR(ScalarExprEvaluator::Create(*partition_by_eq_expr_, state, pool_, + fn_pool_.get(), &partition_by_eq_expr_eval_)); + AddEvaluatorToFree(partition_by_eq_expr_eval_); } - if (partition_by_eq_expr_ctx_ != NULL || order_by_eq_expr_ctx_ != NULL) { - DCHECK(buffered_tuple_desc_ != NULL); - vector<TTupleId> tuple_ids; - tuple_ids.push_back(child(0)->row_desc().tuple_descriptors()[0]->id()); - tuple_ids.push_back(buffered_tuple_desc_->id()); - RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false)); - if (partition_by_eq_expr_ctx_ != NULL) { - RETURN_IF_ERROR( - partition_by_eq_expr_ctx_->Prepare(state, cmp_row_desc, expr_mem_tracker())); - AddExprCtxToFree(partition_by_eq_expr_ctx_); - } - if (order_by_eq_expr_ctx_ != NULL) { - RETURN_IF_ERROR( - order_by_eq_expr_ctx_->Prepare(state, cmp_row_desc, expr_mem_tracker())); - AddExprCtxToFree(order_by_eq_expr_ctx_); - } + if (order_by_eq_expr_ != nullptr) { + RETURN_IF_ERROR(ScalarExprEvaluator::Create(*order_by_eq_expr_, state, pool_, + fn_pool_.get(), &order_by_eq_expr_eval_)); + AddEvaluatorToFree(order_by_eq_expr_eval_); } // Must be kept in sync with AnalyticEvalNode.computeResourceProfile() in fe. @@ -191,8 +190,8 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->Open(state)); - DCHECK(client_ != NULL); - DCHECK(input_stream_ == NULL); + DCHECK(client_ != nullptr); + DCHECK(input_stream_ == nullptr); input_stream_.reset( new BufferedTupleStream(state, child(0)->row_desc(), state->block_mgr(), client_, false /* use_initial_small_buffers */, true /* read_write */)); @@ -208,30 +207,29 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { return state->block_mgr()->MemLimitTooLowError(client_, id()); } - DCHECK_EQ(evaluators_.size(), fn_ctxs_.size()); - for (int i = 0; i < evaluators_.size(); ++i) { - RETURN_IF_ERROR(evaluators_[i]->Open(state, fn_ctxs_[i])); - DCHECK(!evaluators_[i]->is_merge()); - + for (int i = 0; i < analytic_fn_evals_.size(); ++i) { + RETURN_IF_ERROR(analytic_fn_evals_[i]->Open(state)); + FunctionContext* agg_fn_ctx = analytic_fn_evals_[i]->agg_fn_ctx(); if (!has_first_val_null_offset_ && - "first_value_rewrite" == evaluators_[i]->fn_name() && - fn_ctxs_[i]->GetNumArgs() == 2) { + "first_value_rewrite" == analytic_fns_[i]->fn_name() && + agg_fn_ctx->GetNumArgs() == 2) { DCHECK(!has_first_val_null_offset_); first_val_null_offset_ = - reinterpret_cast<BigIntVal*>(fn_ctxs_[i]->GetConstantArg(1))->val; + reinterpret_cast<BigIntVal*>(agg_fn_ctx->GetConstantArg(1))->val; VLOG_FILE << id() << " FIRST_VAL rewrite null offset: " << first_val_null_offset_; has_first_val_null_offset_ = true; } } - if (partition_by_eq_expr_ctx_ != NULL) { - RETURN_IF_ERROR(partition_by_eq_expr_ctx_->Open(state)); + if (partition_by_eq_expr_eval_ != nullptr) { + RETURN_IF_ERROR(partition_by_eq_expr_eval_->Open(state)); } - if (order_by_eq_expr_ctx_ != NULL) { - RETURN_IF_ERROR(order_by_eq_expr_ctx_->Open(state)); + + if (order_by_eq_expr_eval_ != nullptr) { + RETURN_IF_ERROR(order_by_eq_expr_eval_->Open(state)); } - if (buffered_tuple_desc_ != NULL) { + if (buffered_tuple_desc_ != nullptr) { // The backing mem_pool_ is freed in Reset(), so we need to allocate // a new row every time we Open(). child_tuple_cmp_row_ = reinterpret_cast<TupleRow*>( @@ -240,7 +238,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { // An intermediate tuple is only allocated once and is reused. curr_tuple_ = Tuple::Create(intermediate_tuple_desc_->byte_size(), mem_pool_.get()); - AggFnEvaluator::Init(evaluators_, fn_ctxs_, curr_tuple_); + AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_); // Allocate dummy_result_tuple_ even if AggFnEvaluator::Init() may have failed // as it is needed in Close(). // TODO: move this to Prepare() @@ -351,7 +349,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) { if (fn_scope_ != ROWS || !window_.__isset.window_start || stream_idx - rows_start_offset_ >= curr_partition_idx_) { VLOG_ROW << id() << " Update idx=" << stream_idx; - AggFnEvaluator::Add(evaluators_, fn_ctxs_, row, curr_tuple_); + AggFnEvaluator::Add(analytic_fn_evals_, row, curr_tuple_); if (window_.__isset.window_start) { VLOG_ROW << id() << " Adding tuple to window at idx=" << stream_idx; Tuple* tuple = row->GetTuple(0)->DeepCopy( @@ -384,20 +382,20 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) { Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) { VLOG_ROW << id() << " AddResultTuple idx=" << stream_idx; - DCHECK(curr_tuple_ != NULL); + DCHECK(curr_tuple_ != nullptr); MemPool* cur_tuple_pool = curr_tuple_pool_.get(); Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), cur_tuple_pool); - AggFnEvaluator::GetValue(evaluators_, fn_ctxs_, curr_tuple_, result_tuple); + AggFnEvaluator::GetValue(analytic_fn_evals_, curr_tuple_, result_tuple); // Copy any string data in 'result_tuple' into 'cur_tuple_pool_'. for (const SlotDescriptor* slot_desc : result_tuple_desc_->slots()) { if (!slot_desc->type().IsVarLenStringType()) continue; StringValue* sv = reinterpret_cast<StringValue*>( result_tuple->GetSlot(slot_desc->tuple_offset())); - if (sv == NULL || sv->len == 0) continue; + if (sv == nullptr || sv->len == 0) continue; char* new_ptr = reinterpret_cast<char*>(cur_tuple_pool->TryAllocate(sv->len)); - if (UNLIKELY(new_ptr == NULL)) { - return cur_tuple_pool->mem_tracker()->MemLimitExceeded(NULL, + if (UNLIKELY(new_ptr == nullptr)) { + return cur_tuple_pool->mem_tracker()->MemLimitExceeded(nullptr, "Failed to allocate memory for analytic function's result.", sv->len); } memcpy(new_ptr, sv->ptr, sv->len); @@ -419,7 +417,7 @@ inline Status AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition, VLOG_ROW << id() << " TryAddResultTupleForPrevRow partition=" << next_partition << " idx=" << stream_idx; if (fn_scope_ != ROWS && (next_partition || (fn_scope_ == RANGE && - window_.__isset.window_end && !PrevRowCompare(order_by_eq_expr_ctx_)))) { + window_.__isset.window_end && !PrevRowCompare(order_by_eq_expr_eval_)))) { RETURN_IF_ERROR(AddResultTuple(stream_idx - 1)); } return Status::OK(); @@ -450,7 +448,7 @@ inline void AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) { DCHECK_EQ(remove_idx + max<int64_t>(rows_start_offset_, 0), window_tuples_.front().first) << DebugStateString(true); TupleRow* remove_row = reinterpret_cast<TupleRow*>(&window_tuples_.front().second); - AggFnEvaluator::Remove(evaluators_, fn_ctxs_, remove_row, curr_tuple_); + AggFnEvaluator::Remove(analytic_fn_evals_, remove_row, curr_tuple_); window_tuples_.pop_front(); } @@ -469,11 +467,11 @@ inline Status AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx, // lead() is re-written to a ROWS window with an end bound FOLLOWING. Any remaining // results need the default value (set by Init()). If this is the case, the start bound // is UNBOUNDED PRECEDING (DCHECK in Init()). - for (int i = 0; i < evaluators_.size(); ++i) { + for (int i = 0; i < analytic_fn_evals_.size(); ++i) { if (is_lead_fn_[i]) { // Needs to call Finalize() to release resources. - evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_); - evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_); + analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_); + analytic_fn_evals_[i]->Init(curr_tuple_); } } @@ -492,7 +490,7 @@ inline Status AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx, VLOG_ROW << id() << " Remove window_row_idx=" << window_tuples_.front().first << " for result row at idx=" << next_result_idx; TupleRow* remove_row = reinterpret_cast<TupleRow*>(&window_tuples_.front().second); - AggFnEvaluator::Remove(evaluators_, fn_ctxs_, remove_row, curr_tuple_); + AggFnEvaluator::Remove(analytic_fn_evals_, remove_row, curr_tuple_); window_tuples_.pop_front(); } RETURN_IF_ERROR(AddResultTuple(last_result_idx_ + 1)); @@ -522,7 +520,7 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state, // >= curr_partition_idx_ is the last result tuple of the previous partition. Adding // the last result tuple to result_tuples_ with a stream index curr_partition_idx_ - 1 // ensures that all rows in the previous partition have corresponding analytic results. - Tuple* prev_partition_last_result_tuple = NULL; + Tuple* prev_partition_last_result_tuple = nullptr; while (!result_tuples_.empty() && result_tuples_.back().first >= curr_partition_idx_) { DCHECK(fn_scope_ == ROWS && window_.__isset.window_end && window_.window_end.type == TAnalyticWindowBoundaryType::PRECEDING); @@ -531,7 +529,7 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state, prev_partition_last_result_tuple = result_tuples_.back().second; result_tuples_.pop_back(); } - if (prev_partition_last_result_tuple != NULL) { + if (prev_partition_last_result_tuple != nullptr) { if (result_tuples_.empty() || result_tuples_.back().first < curr_partition_idx_ - 1) { // prev_partition_last_result_tuple was the last result tuple in the partition, add @@ -558,14 +556,14 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state, VLOG_ROW << id() << " Reset curr_tuple"; // Call finalize to release resources; result is not needed but the dst tuple must be // a tuple described by result_tuple_desc_. - AggFnEvaluator::Finalize(evaluators_, fn_ctxs_, curr_tuple_, dummy_result_tuple_); + AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, dummy_result_tuple_); // Re-initialize curr_tuple_. curr_tuple_->Init(intermediate_tuple_desc_->byte_size()); - AggFnEvaluator::Init(evaluators_, fn_ctxs_, curr_tuple_); + AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_); // Check for errors in AggFnEvaluator::Init(). RETURN_IF_ERROR(state->GetQueryStatus()); - // Add a result tuple containing values set by Init() (e.g. NULL for sum(), 0 for + // Add a result tuple containing values set by Init() (e.g. nullptr for sum(), 0 for // count()) for output rows that have no input rows in the window. We need to add this // result tuple before any input rows are consumed and the evaluators are updated. if (fn_scope_ == ROWS && window_.__isset.window_end && @@ -594,9 +592,9 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state, return Status::OK(); } -inline bool AnalyticEvalNode::PrevRowCompare(ExprContext* pred_ctx) { - DCHECK(pred_ctx != NULL); - BooleanVal result = pred_ctx->GetBooleanVal(child_tuple_cmp_row_); +inline bool AnalyticEvalNode::PrevRowCompare(ScalarExprEvaluator* pred_eval) { + DCHECK(pred_eval != nullptr); + BooleanVal result = pred_eval->GetBooleanVal(child_tuple_cmp_row_); DCHECK(!result.is_null); return result.val; } @@ -650,11 +648,12 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) { for (; batch_idx < curr_child_batch_->num_rows(); ++batch_idx, ++stream_idx) { TupleRow* row = curr_child_batch_->GetRow(batch_idx); - if (partition_by_eq_expr_ctx_ != NULL || order_by_eq_expr_ctx_ != NULL) { + if (partition_by_eq_expr_eval_ != nullptr || + order_by_eq_expr_eval_ != nullptr) { // Only set the tuples in child_tuple_cmp_row_ if there are partition exprs or // order by exprs that require comparing the current and previous rows. If there - // aren't partition or order by exprs (i.e. empty OVER() clause), there was no sort - // and there could be nullable tuples (whereas the sort node does not produce + // aren't partition or order by exprs (i.e. empty OVER() clause), there was no + // sort and there could be nullable tuples (whereas the sort node does not produce // them), see IMPALA-1562. child_tuple_cmp_row_->SetTuple(0, prev_input_row_->GetTuple(0)); child_tuple_cmp_row_->SetTuple(1, row->GetTuple(0)); @@ -663,22 +662,23 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) { // Every row is compared against the previous row to determine if (a) the row // starts a new partition or (b) the row does not share the same values for the - // ordering exprs. When either of these occurs, the evaluators_ are finalized and - // the result tuple is added to result_tuples_ so that it may be added to output - // rows in GetNextOutputBatch(). When a new partition is found (a), a new, empty - // result tuple is created and initialized over the evaluators_. If the row has - // different values for the ordering exprs (b), then a new tuple is created but - // copied from curr_tuple_ because the original is used for one or more previous - // row(s) but the incremental state still applies to the current row. + // ordering exprs. When either of these occurs, the analytic_fn_evals_ are + // finalized and the result tuple is added to result_tuples_ so that it may be + // added to output rows in GetNextOutputBatch(). When a new partition is found + // (a), a new, empty result tuple is created and initialized over the + // analytic_fn_evals_. If the row has different values for the ordering + // exprs (b), then a new tuple is created but copied from curr_tuple_ because the + // original is used for one or more previous row(s) but the incremental state still + // applies to the current row. bool next_partition = false; - if (partition_by_eq_expr_ctx_ != NULL) { - // partition_by_eq_expr_ctx_ checks equality over the predicate exprs - next_partition = !PrevRowCompare(partition_by_eq_expr_ctx_); + if (partition_by_eq_expr_eval_ != nullptr) { + // partition_by_eq_expr_eval_ checks equality over the predicate exprs + next_partition = !PrevRowCompare(partition_by_eq_expr_eval_); } RETURN_IF_ERROR(TryAddResultTupleForPrevRow(next_partition, stream_idx, row)); if (next_partition) RETURN_IF_ERROR(InitNextPartition(state, stream_idx)); - // The evaluators_ are updated with the current row. + // The analytic_fn_evals_ are updated with the current row. RETURN_IF_ERROR(AddRow(stream_idx, row)); RETURN_IF_ERROR(TryAddResultTupleForCurrRow(stream_idx, row)); @@ -709,8 +709,8 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) { return Status::OK(); } -Status AnalyticEvalNode::GetNextOutputBatch(RuntimeState* state, RowBatch* output_batch, - bool* eos) { +Status AnalyticEvalNode::GetNextOutputBatch( + RuntimeState* state, RowBatch* output_batch, bool* eos) { SCOPED_TIMER(evaluation_timer_); VLOG_FILE << id() << " GetNextOutputBatch: " << DebugStateString() << " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes(); @@ -793,7 +793,7 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* bool output_eos = false; RETURN_IF_ERROR(GetNextOutputBatch(state, row_batch, &output_eos)); - if (curr_child_batch_.get() == NULL && output_eos) { + if (curr_child_batch_.get() == nullptr && output_eos) { // Transfer the ownership of all row-backing resources on eos for simplicity. // TODO: This transfer is simple and correct, but not necessarily efficient. We // should optimize the use/transfer of memory to better amortize allocations @@ -837,20 +837,20 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) { DCHECK_EQ(prev_tuple_pool_->total_allocated_bytes(), 0); // Call Finalize() to clear evaluator allocations, but do not Close() them, // so we can keep evaluating them. - if (curr_tuple_ != NULL) { - for (int i = 0; i < evaluators_.size(); ++i) { - evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_); + if (curr_tuple_ != nullptr) { + for (int i = 0; i < analytic_fn_evals_.size(); ++i) { + analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_); } } mem_pool_->Clear(); // The following members will be re-created in Open(). // input_stream_ should have been closed by last GetNext() call. - DCHECK(input_stream_ == NULL || input_stream_->is_closed()); + DCHECK(input_stream_ == nullptr || input_stream_->is_closed()); input_stream_.reset(); - curr_tuple_ = NULL; - child_tuple_cmp_row_ = NULL; - dummy_result_tuple_ = NULL; - prev_input_row_ = NULL; + curr_tuple_ = nullptr; + child_tuple_cmp_row_ = nullptr; + dummy_result_tuple_ = nullptr; + prev_input_row_ = nullptr; prev_child_batch_.reset(); curr_child_batch_.reset(); return ExecNode::Reset(state); @@ -858,37 +858,43 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) { void AnalyticEvalNode::Close(RuntimeState* state) { if (is_closed()) return; - if (client_ != NULL) state->block_mgr()->ClearReservations(client_); + if (client_ != nullptr) state->block_mgr()->ClearReservations(client_); // We may need to clean up input_stream_ if an error occurred at some point. - if (input_stream_ != NULL) { - input_stream_->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + if (input_stream_ != nullptr) { + input_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } - // Close all evaluators and fn ctxs. If an error occurred in Init or Prepare there may - // be fewer ctxs than evaluators. We also need to Finalize if curr_tuple_ was created - // in Open. - DCHECK_LE(fn_ctxs_.size(), evaluators_.size()); - DCHECK(curr_tuple_ == NULL || fn_ctxs_.size() == evaluators_.size()); - for (int i = 0; i < evaluators_.size(); ++i) { + // Close all evaluators. If an error occurred in Init or Prepare there may be fewer + // be fewer evaluators than analytic fns. We also need to Finalize if curr_tuple_ was + // created in Open(). + DCHECK_LE(analytic_fn_evals_.size(), analytic_fns_.size()); + DCHECK(curr_tuple_ == nullptr || + analytic_fn_evals_.size() == analytic_fns_.size()); + for (int i = 0; i < analytic_fn_evals_.size(); ++i) { // Need to make sure finalize is called in case there is any state to clean up. - if (curr_tuple_ != NULL) { - evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_); + if (curr_tuple_ != nullptr) { + analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_); } - evaluators_[i]->Close(state); - } - for (int i = 0; i < fn_ctxs_.size(); ++i) { - fn_ctxs_[i]->impl()->FreeLocalAllocations(); - fn_ctxs_[i]->impl()->Close(); + analytic_fn_evals_[i]->Close(state); } + AggFn::Close(analytic_fns_); - if (partition_by_eq_expr_ctx_ != NULL) partition_by_eq_expr_ctx_->Close(state); - if (order_by_eq_expr_ctx_ != NULL) order_by_eq_expr_ctx_->Close(state); - if (prev_child_batch_.get() != NULL) prev_child_batch_.reset(); - if (curr_child_batch_.get() != NULL) curr_child_batch_.reset(); - if (curr_tuple_pool_.get() != NULL) curr_tuple_pool_->FreeAll(); - if (prev_tuple_pool_.get() != NULL) prev_tuple_pool_->FreeAll(); - if (mem_pool_.get() != NULL) mem_pool_->FreeAll(); - if (fn_pool_.get() != NULL) fn_pool_->FreeAll(); + if (partition_by_eq_expr_ != nullptr) { + if (partition_by_eq_expr_eval_ != nullptr) { + partition_by_eq_expr_eval_->Close(state); + } + partition_by_eq_expr_->Close(); + } + if (order_by_eq_expr_ != nullptr) { + if (order_by_eq_expr_eval_ != nullptr) order_by_eq_expr_eval_->Close(state); + order_by_eq_expr_->Close(); + } + if (prev_child_batch_.get() != nullptr) prev_child_batch_.reset(); + if (curr_child_batch_.get() != nullptr) curr_child_batch_.reset(); + if (curr_tuple_pool_.get() != nullptr) curr_tuple_pool_->FreeAll(); + if (prev_tuple_pool_.get() != nullptr) prev_tuple_pool_->FreeAll(); + if (mem_pool_.get() != nullptr) mem_pool_->FreeAll(); + if (fn_pool_.get() != nullptr) fn_pool_->FreeAll(); ExecNode::Close(state); } @@ -896,22 +902,19 @@ void AnalyticEvalNode::DebugString(int indentation_level, stringstream* out) con *out << string(indentation_level * 2, ' '); *out << "AnalyticEvalNode(" << " window=" << DebugWindowString(); - if (partition_by_eq_expr_ctx_ != NULL) { - *out << " partition_exprs=" << partition_by_eq_expr_ctx_->root()->DebugString(); + if (partition_by_eq_expr_ != nullptr) { + *out << " partition_exprs=" << partition_by_eq_expr_->DebugString(); } - if (order_by_eq_expr_ctx_ != NULL) { - *out << " order_by_exprs=" << order_by_eq_expr_ctx_->root()->DebugString(); + if (order_by_eq_expr_ != nullptr) { + *out << " order_by_exprs=" << order_by_eq_expr_->DebugString(); } - *out << AggFnEvaluator::DebugString(evaluators_); + *out << AggFn::DebugString(analytic_fns_); ExecNode::DebugString(indentation_level, out); *out << ")"; } Status AnalyticEvalNode::QueryMaintenance(RuntimeState* state) { - for (int i = 0; i < evaluators_.size(); ++i) { - ExprContext::FreeLocalAllocations(evaluators_[i]->input_expr_ctxs()); - } - ExprContext::FreeLocalAllocations(fn_ctxs_); + AggFnEvaluator::FreeLocalAllocations(analytic_fn_evals_); return ExecNode::QueryMaintenance(state); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/analytic-eval-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h index 0203175..89c5cf3 100644 --- a/be/src/exec/analytic-eval-node.h +++ b/be/src/exec/analytic-eval-node.h @@ -19,15 +19,16 @@ #define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H #include "exec/exec-node.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" #include "runtime/buffered-block-mgr.h" #include "runtime/buffered-tuple-stream.h" #include "runtime/tuple.h" namespace impala { +class AggFn; class AggFnEvaluator; +class ScalarExpr; +class ScalarExprEvaluator; /// Evaluates analytic functions with a single pass over input rows. It is assumed /// that the input has already been sorted on all of the partition exprs and then the @@ -71,7 +72,7 @@ class AnalyticEvalNode : public ExecNode { virtual void Close(RuntimeState* state); protected: - /// Frees local allocations from evaluators_ + /// Frees local allocations from analytic_fn_evals_ virtual Status QueryMaintenance(RuntimeState* state); virtual void DebugString(int indentation_level, std::stringstream* out) const; @@ -162,7 +163,7 @@ class AnalyticEvalNode : public ExecNode { Status InitNextPartition(RuntimeState* state, int64_t stream_idx); /// Produces a result tuple with analytic function results by calling GetValue() or - /// Finalize() for 'curr_tuple_' on the 'evaluators_'. The result tuple is stored in + /// Finalize() for 'curr_tuple_' on the 'evaluators'. The result tuple is stored in /// 'result_tuples_' with the index into 'input_stream_' specified by 'stream_idx'. /// Returns an error when memory limit is exceeded. Status AddResultTuple(int64_t stream_idx); @@ -175,9 +176,10 @@ class AnalyticEvalNode : public ExecNode { /// This is necessary to produce the default value (set by Init()). void ResetLeadFnSlots(); - /// Evaluates the predicate pred_ctx over child_tuple_cmp_row_, which is a TupleRow* - /// containing the previous row and the current row set during ProcessChildBatch(). - bool PrevRowCompare(ExprContext* pred_ctx); + /// Evaluates the predicate pred_eval over child_tuple_cmp_row_, which is + /// a TupleRow* containing the previous row and the current row set during + /// ProcessChildBatch(). + bool PrevRowCompare(ScalarExprEvaluator* pred_eval); /// Debug string containing current state. If 'detailed', per-row state is included. std::string DebugStateString(bool detailed) const; @@ -193,23 +195,25 @@ class AnalyticEvalNode : public ExecNode { const TAnalyticWindow window_; /// Tuple descriptor for storing intermediate values of analytic fn evaluation. - const TupleDescriptor* intermediate_tuple_desc_; + const TupleDescriptor* intermediate_tuple_desc_ = nullptr; /// Tuple descriptor for storing results of analytic fn evaluation. - const TupleDescriptor* result_tuple_desc_; + const TupleDescriptor* result_tuple_desc_ = nullptr; /// Tuple descriptor of the buffered tuple (identical to the input child tuple, which is /// assumed to come from a single SortNode). NULL if both partition_exprs and /// order_by_exprs are empty. - TupleDescriptor* buffered_tuple_desc_; + TupleDescriptor* buffered_tuple_desc_ = nullptr; - /// Expr context for a predicate that checks if child tuple '<' buffered tuple for - /// partitioning exprs. - ExprContext* partition_by_eq_expr_ctx_; + /// A predicate that checks if child tuple '<' buffered tuple for partitioning exprs + /// and its evaluator. + ScalarExpr* partition_by_eq_expr_ = nullptr; + ScalarExprEvaluator* partition_by_eq_expr_eval_ = nullptr; - /// Expr context for a predicate that checks if child tuple '<' buffered tuple for - /// order by exprs. - ExprContext* order_by_eq_expr_ctx_; + /// A predicate that checks if child tuple '<' buffered tuple for order by exprs and + /// its evaluator. + ScalarExpr* order_by_eq_expr_ = nullptr; + ScalarExprEvaluator* order_by_eq_expr_eval_ = nullptr; /// The scope over which analytic functions are evaluated. /// TODO: Consider adding additional state to capture whether different kinds of window @@ -222,8 +226,10 @@ class AnalyticEvalNode : public ExecNode { int64_t rows_start_offset_; int64_t rows_end_offset_; - /// Analytic function evaluators. - std::vector<AggFnEvaluator*> evaluators_; + /// Analytic functions and their evaluators. 'analytic_fns_' live in the query-state's + /// objpool while the evaluators live in the exec node's objpool. + std::vector<AggFn*> analytic_fns_; + std::vector<AggFnEvaluator*> analytic_fn_evals_; /// Indicates if each evaluator is the lead() fn. Used by ResetLeadFnSlots() to /// determine which slots need to be reset. @@ -234,10 +240,6 @@ class AnalyticEvalNode : public ExecNode { bool has_first_val_null_offset_; long first_val_null_offset_; - /// FunctionContext for each analytic function. String data returned by the analytic - /// functions is allocated via these contexts. - std::vector<impala_udf::FunctionContext*> fn_ctxs_; - /// Mem pool backing allocations from fn_ctxs_. This pool must not be Reset() because /// the memory is managed by the FreePools of the function contexts which do their own /// bookkeeping using a pointer-based structure stored in the memory blocks themselves. @@ -253,15 +255,15 @@ class AnalyticEvalNode : public ExecNode { boost::scoped_ptr<MemPool> prev_tuple_pool_; /// Block manager client used by input_stream_. Not owned. - BufferedBlockMgr::Client* client_; + BufferedBlockMgr::Client* client_ = nullptr; ///////////////////////////////////////// /// BEGIN: Members that must be Reset() /// TupleRow* composed of the first child tuple and the buffered tuple, used by - /// partition_by_eq_expr_ctx_ and order_by_eq_expr_ctx_. Set in Open() if + /// partition_by_eq_expr_eval_ and order_by_eq_expr_eval_. Set in Open() if /// buffered_tuple_desc_ is not NULL, allocated from mem_pool_. - TupleRow* child_tuple_cmp_row_; + TupleRow* child_tuple_cmp_row_ = nullptr; /// Queue of tuples which are ready to be set in output rows, with the index into /// the input_stream_ stream of the last TupleRow that gets the Tuple, i.e. this is a @@ -300,23 +302,23 @@ class AnalyticEvalNode : public ExecNode { int64_t prev_pool_last_window_idx_; /// The tuple described by intermediate_tuple_desc_ storing intermediate state for the - /// evaluators_. When enough input rows have been consumed to produce the analytic - /// function results, a result tuple (described by result_tuple_desc_) is created and - /// the agg fn results are written to that tuple by calling Finalize()/GetValue() - /// on the evaluators with curr_tuple_ as the source tuple. - Tuple* curr_tuple_; + /// analytic_eval_fns_. When enough input rows have been consumed to produce the + /// analytic function results, a result tuple (described by result_tuple_desc_) is + /// created and the agg fn results are written to that tuple by calling Finalize()/ + /// GetValue() on the evaluators with curr_tuple_ as the source tuple. + Tuple* curr_tuple_ = nullptr; /// A tuple described by result_tuple_desc_ used when calling Finalize() on the - /// evaluators_ to release resources between partitions; the value is never used. + /// analytic_fn_evals_ to release resources between partitions; the value is never used. /// TODO: Remove when agg fns implement a separate Close() method to release resources. - Tuple* dummy_result_tuple_; + Tuple* dummy_result_tuple_ = nullptr; /// Index of the row in input_stream_ at which the current partition started. int64_t curr_partition_idx_; /// Previous input row used to compare partition boundaries and to determine when the /// order-by expressions change. - TupleRow* prev_input_row_; + TupleRow* prev_input_row_ = nullptr; /// Current and previous input row batches from the child. RowBatches are allocated /// once and reused. Previous input row batch owns prev_input_row_ between calls to http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/blocking-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc index 6e73f77..486c5cc 100644 --- a/be/src/exec/blocking-join-node.cc +++ b/be/src/exec/blocking-join-node.cc @@ -20,7 +20,7 @@ #include <sstream> #include "exec/data-sink.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "runtime/fragment-instance-state.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" @@ -55,7 +55,7 @@ Status BlockingJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Init(tnode, state)); DCHECK((join_op_ != TJoinOp::LEFT_SEMI_JOIN && join_op_ != TJoinOp::LEFT_ANTI_JOIN && join_op_ != TJoinOp::RIGHT_SEMI_JOIN && join_op_ != TJoinOp::RIGHT_ANTI_JOIN && - join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || conjunct_ctxs_.size() == 0); + join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || conjuncts_.size() == 0); runtime_profile_->AddLocalTimeCounter( bind<int64_t>(&BlockingJoinNode::LocalTimeCounterFn, runtime_profile_->total_time_counter(), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/blocking-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h index c184857..a14d979 100644 --- a/be/src/exec/blocking-join-node.h +++ b/be/src/exec/blocking-join-node.h @@ -30,8 +30,6 @@ namespace impala { -class MemPool; -class MemTracker; class RowBatch; class TupleRow; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index dad6247..437343e 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -27,7 +27,7 @@ #include "exec/kudu-table-sink.h" #include "exec/kudu-util.h" #include "exec/plan-root-sink.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "gen-cpp/ImpalaInternalService_constants.h" #include "gen-cpp/ImpalaInternalService_types.h" #include "gutil/strings/substitute.h" @@ -48,64 +48,64 @@ DataSink::~DataSink() { DCHECK(closed_); } -Status DataSink::Create(ObjectPool* pool, - const TPlanFragmentCtx& fragment_ctx, - const TPlanFragmentInstanceCtx& fragment_instance_ctx, - const RowDescriptor& row_desc, DataSink** sink) { +Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx, + const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor& row_desc, + RuntimeState* state, DataSink** sink) { const TDataSink& thrift_sink = fragment_ctx.fragment.output_sink; - const vector<TExpr>& output_exprs = fragment_ctx.fragment.output_exprs; + const vector<TExpr>& thrift_output_exprs = fragment_ctx.fragment.output_exprs; + ObjectPool* pool = state->obj_pool(); switch (thrift_sink.type) { case TDataSinkType::DATA_STREAM_SINK: - if (!thrift_sink.__isset.stream_sink) { - return Status("Missing data stream sink."); - } + if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink."); // TODO: figure out good buffer size based on size of output row - *sink = pool->Add( - new DataStreamSender(pool, - fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink, - fragment_ctx.destinations, 16 * 1024)); + *sink = pool->Add(new DataStreamSender(fragment_instance_ctx.sender_id, row_desc, + thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 1024)); break; - case TDataSinkType::TABLE_SINK: if (!thrift_sink.__isset.table_sink) return Status("Missing table sink."); switch (thrift_sink.table_sink.type) { case TTableSinkType::HDFS: - *sink = pool->Add(new HdfsTableSink(row_desc, output_exprs, thrift_sink)); + *sink = pool->Add(new HdfsTableSink(row_desc, thrift_sink)); break; case TTableSinkType::HBASE: - *sink = pool->Add(new HBaseTableSink(row_desc, output_exprs, thrift_sink)); + *sink = pool->Add(new HBaseTableSink(row_desc, thrift_sink)); break; case TTableSinkType::KUDU: RETURN_IF_ERROR(CheckKuduAvailability()); - *sink = pool->Add(new KuduTableSink(row_desc, output_exprs, thrift_sink)); + *sink = pool->Add(new KuduTableSink(row_desc, thrift_sink)); break; default: stringstream error_msg; - const char* str = "Unknown table sink"; map<int, const char*>::const_iterator i = _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type); - if (i != _TTableSinkType_VALUES_TO_NAMES.end()) str = i->second; + const char* str = i != _TTableSinkType_VALUES_TO_NAMES.end() ? + i->second : "Unknown table sink"; error_msg << str << " not implemented."; return Status(error_msg.str()); } - break; case TDataSinkType::PLAN_ROOT_SINK: - *sink = pool->Add(new PlanRootSink(row_desc, output_exprs, thrift_sink)); + *sink = pool->Add(new PlanRootSink(row_desc)); break; default: stringstream error_msg; map<int, const char*>::const_iterator i = _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type); - const char* str = "Unknown data sink type "; - if (i != _TDataSinkType_VALUES_TO_NAMES.end()) str = i->second; + const char* str = i != _TDataSinkType_VALUES_TO_NAMES.end() ? + i->second : "Unknown data sink type "; error_msg << str << " not implemented."; return Status(error_msg.str()); } + RETURN_IF_ERROR((*sink)->Init(thrift_output_exprs, thrift_sink, state)); return Status::OK(); } +Status DataSink::Init(const vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state) { + return ScalarExpr::Create(thrift_output_exprs, row_desc_, state, &output_exprs_); +} + void DataSink::MergeDmlStats(const TInsertStats& src_stats, TInsertStats* dst_stats) { dst_stats->bytes_written += src_stats.bytes_written; @@ -114,7 +114,6 @@ void DataSink::MergeDmlStats(const TInsertStats& src_stats, if (!dst_stats->kudu_stats.__isset.num_row_errors) { dst_stats->kudu_stats.__set_num_row_errors(0); } - dst_stats->kudu_stats.__set_num_row_errors( dst_stats->kudu_stats.num_row_errors + src_stats.kudu_stats.num_row_errors); } @@ -177,11 +176,22 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker)); expr_mem_tracker_.reset( new MemTracker(-1, Substitute("$0 Exprs", name), mem_tracker_.get(), false)); + expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get())); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_exprs_, state, state->obj_pool(), + expr_mem_pool(), &output_expr_evals_)); return Status::OK(); } +Status DataSink::Open(RuntimeState* state) { + DCHECK_EQ(output_exprs_.size(), output_expr_evals_.size()); + return ScalarExprEvaluator::Open(output_expr_evals_, state); +} + void DataSink::Close(RuntimeState* state) { if (closed_) return; + ScalarExprEvaluator::Close(output_expr_evals_, state); + ScalarExpr::Close(output_exprs_); + if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll(); if (expr_mem_tracker_ != NULL) { expr_mem_tracker_->UnregisterFromParent(); expr_mem_tracker_.reset(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/data-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h index f20c40b..cfd06bb 100644 --- a/be/src/exec/data-sink.h +++ b/be/src/exec/data-sink.h @@ -29,10 +29,14 @@ namespace impala { +class MemPool; class ObjectPool; class RowBatch; class RuntimeProfile; +class RuntimeState; class RowDescriptor; +class ScalarExpr; +class ScalarExprEvaluator; class TDataSink; class TPlanExecRequest; class TPlanExecParams; @@ -57,12 +61,13 @@ class DataSink { virtual std::string GetName() = 0; /// Setup. Call before Send(), Open(), or Close() during the prepare phase of the query - /// fragment. Creates a MemTracker (in obj_pool) for the sink that is a child of - /// 'parent_mem_tracker'. Subclasses must call DataSink::Prepare(). + /// fragment. Creates a MemTracker for the sink that is a child of 'parent_mem_tracker'. + /// Also creates a MemTracker and MemPool for the output (and partitioning) expr and + /// initializes their evaluators. Subclasses must call DataSink::Prepare(). virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker); - /// Call before Send() to open the sink. - virtual Status Open(RuntimeState* state) = 0; + /// Call before Send() to open the sink and initialize output expression evaluators. + virtual Status Open(RuntimeState* state); /// Send a row batch into this sink. Send() may modify 'batch' by acquiring its state. virtual Status Send(RuntimeState* state, RowBatch* batch) = 0; @@ -79,10 +84,9 @@ class DataSink { /// Creates a new data sink, allocated in pool and returned through *sink, from /// thrift_sink. - static Status Create(ObjectPool* pool, - const TPlanFragmentCtx& fragment_ctx, - const TPlanFragmentInstanceCtx& fragment_instance_ctx, - const RowDescriptor& row_desc, DataSink** sink); + static Status Create(const TPlanFragmentCtx& fragment_ctx, + const TPlanFragmentInstanceCtx& fragment_instance_ctx, + const RowDescriptor& row_desc, RuntimeState* state, DataSink** sink); /// Merges one update to the DML stats for a partition. dst_stats will have the /// combined stats of src_stats and dst_stats after this method returns. @@ -95,6 +99,10 @@ class DataSink { MemTracker* mem_tracker() const { return mem_tracker_.get(); } RuntimeProfile* profile() const { return profile_; } + MemPool* expr_mem_pool() const { return expr_mem_pool_.get(); } + const std::vector<ScalarExprEvaluator*>& output_expr_evals() const { + return output_expr_evals_; + } protected: /// Set to true after Close() has been called. Subclasses should check and set this in @@ -112,7 +120,18 @@ class DataSink { /// A child of 'mem_tracker_' that tracks expr allocations. Initialized in Prepare(). boost::scoped_ptr<MemTracker> expr_mem_tracker_; -}; + /// MemPool for backing data structures in expressions and their evaluators. + boost::scoped_ptr<MemPool> expr_mem_pool_; + + /// Output expressions to convert row batches onto output values. + /// Not used in some sub-classes. + std::vector<ScalarExpr*> output_exprs_; + std::vector<ScalarExprEvaluator*> output_expr_evals_; + + /// Initialize the expressions in the data sink and return error status on failure. + virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state); +}; } // namespace impala #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/data-source-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc index c0a9287..01e0dbe 100644 --- a/be/src/exec/data-source-scan-node.cc +++ b/be/src/exec/data-source-scan-node.cc @@ -22,12 +22,13 @@ #include "exec/parquet-common.h" #include "exec/read-write-util.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/runtime-state.h" #include "runtime/row-batch.h" #include "runtime/string-value.h" +#include "runtime/timestamp-value.h" #include "runtime/tuple-row.h" #include "util/jni-util.h" #include "util/periodic-counter-updater.h" @@ -147,7 +148,7 @@ Status DataSourceScanNode::GetNextInputBatch() { input_batch_.reset(new TGetNextResult()); next_row_idx_ = 0; // Reset all the indexes into the column value arrays to 0 - memset(&cols_next_val_idx_[0], 0, sizeof(int) * cols_next_val_idx_.size()); + memset(cols_next_val_idx_.data(), 0, sizeof(int) * cols_next_val_idx_.size()); TGetNextParams params; params.__set_scan_handle(scan_handle_); RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get())); @@ -320,8 +321,9 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo RETURN_IF_ERROR( row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, &tuple_buffer)); Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer); - ExprContext** ctxs = &conjunct_ctxs_[0]; - int num_ctxs = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* evals = conjunct_evals_.data(); + int num_conjuncts = conjuncts_.size(); + DCHECK_EQ(num_conjuncts, conjunct_evals_.size()); while (true) { { @@ -333,7 +335,7 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo TupleRow* tuple_row = row_batch->GetRow(row_idx); tuple_row->SetTuple(tuple_idx_, tuple); - if (ExecNode::EvalConjuncts(ctxs, num_ctxs, tuple_row)) { + if (ExecNode::EvalConjuncts(evals, num_conjuncts, tuple_row)) { row_batch->CommitLastRow(); tuple = reinterpret_cast<Tuple*>( reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/exchange-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc index 6b5e52b..4d32f80 100644 --- a/be/src/exec/exchange-node.cc +++ b/be/src/exec/exchange-node.cc @@ -59,7 +59,8 @@ Status ExchangeNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Init(tnode, state)); if (!is_merging_) return Status::OK(); - RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.exchange_node.sort_info, pool_)); + RETURN_IF_ERROR(ScalarExpr::Create(tnode.exchange_node.sort_info.ordering_exprs, + row_descriptor_, state, &ordering_exprs_)); is_asc_order_ = tnode.exchange_node.sort_info.is_asc_order; nulls_first_ = tnode.exchange_node.sort_info.nulls_first; return Status::OK(); @@ -81,11 +82,8 @@ Status ExchangeNode::Prepare(RuntimeState* state) { input_row_desc_, state->fragment_instance_id(), id_, num_senders_, FLAGS_exchg_node_buffer_size_bytes, runtime_profile(), is_merging_); if (is_merging_) { - RETURN_IF_ERROR(sort_exec_exprs_.Prepare( - state, row_descriptor_, row_descriptor_, expr_mem_tracker())); - AddExprCtxsToFree(sort_exec_exprs_); less_than_.reset( - new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_)); + new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); AddCodegenDisabledMessage(state); } return Status::OK(); @@ -106,9 +104,9 @@ Status ExchangeNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); if (is_merging_) { - RETURN_IF_ERROR(sort_exec_exprs_.Open(state)); // CreateMerger() will populate its merging heap with batches from the stream_recvr_, // so it is not necessary to call FillInputRowBatch(). + RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool())); RETURN_IF_ERROR(stream_recvr_->CreateMerger(*less_than_.get())); } else { RETURN_IF_ERROR(FillInputRowBatch(state)); @@ -123,12 +121,18 @@ Status ExchangeNode::Reset(RuntimeState* state) { void ExchangeNode::Close(RuntimeState* state) { if (is_closed()) return; - if (is_merging_) sort_exec_exprs_.Close(state); - if (stream_recvr_ != NULL) stream_recvr_->Close(); + if (less_than_.get() != nullptr) less_than_->Close(state); + if (stream_recvr_ != nullptr) stream_recvr_->Close(); stream_recvr_.reset(); + ScalarExpr::Close(ordering_exprs_); ExecNode::Close(state); } +Status ExchangeNode::QueryMaintenance(RuntimeState* state) { + if (less_than_.get() != nullptr) less_than_->FreeLocalAllocations(); + return ExecNode::QueryMaintenance(state); +} + Status ExchangeNode::FillInputRowBatch(RuntimeState* state) { DCHECK(!is_merging_); Status ret_status; @@ -204,7 +208,7 @@ Status ExchangeNode::GetNextMerging(RuntimeState* state, RowBatch* output_batch, RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos)); - while ((num_rows_skipped_ < offset_)) { + while (num_rows_skipped_ < offset_) { num_rows_skipped_ += output_batch->num_rows(); // Throw away rows in the output batch until the offset is skipped. int rows_to_keep = num_rows_skipped_ - offset_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/exchange-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h index 6feaff3..4781213 100644 --- a/be/src/exec/exchange-node.h +++ b/be/src/exec/exchange-node.h @@ -21,12 +21,12 @@ #include <boost/scoped_ptr.hpp> #include "exec/exec-node.h" -#include "exec/sort-exec-exprs.h" namespace impala { -class RowBatch; class DataStreamRecvr; +class RowBatch; +class ScalarExpr; class TupleRowComparator; /// Receiver node for data streams. The data stream receiver is created in Prepare() @@ -58,6 +58,7 @@ class ExchangeNode : public ExecNode { void set_num_senders(int num_senders) { num_senders_ = num_senders; } protected: + virtual Status QueryMaintenance(RuntimeState* state); virtual void DebugString(int indentation_level, std::stringstream* out) const; private: @@ -100,7 +101,7 @@ class ExchangeNode : public ExecNode { boost::scoped_ptr<TupleRowComparator> less_than_; /// Sort expressions and parameters passed to the merging receiver.. - SortExecExprs sort_exec_exprs_; + std::vector<ScalarExpr*> ordering_exprs_; std::vector<bool> is_asc_order_; std::vector<bool> nulls_first_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 4c06ece..1505f71 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -26,7 +26,8 @@ #include "codegen/llvm-codegen.h" #include "common/object-pool.h" #include "common/status.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "exec/aggregation-node.h" #include "exec/analytic-eval-node.h" #include "exec/data-source-scan-node.h" @@ -73,8 +74,8 @@ int ExecNode::GetNodeIdFromProfile(RuntimeProfile* p) { return p->metadata(); } -ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) : - BlockingQueue<RowBatch*>(max_batches) { +ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) + : BlockingQueue<RowBatch*>(max_batches) { } ExecNode::RowBatchQueue::~RowBatchQueue() { @@ -140,26 +141,26 @@ ExecNode::~ExecNode() { Status ExecNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR( - Expr::CreateExprTrees(pool_, tnode.conjuncts, &conjunct_ctxs_)); + ScalarExpr::Create(tnode.conjuncts, row_descriptor_, state, &conjuncts_)); return Status::OK(); } Status ExecNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::PREPARE, state)); DCHECK(runtime_profile_.get() != NULL); - rows_returned_counter_ = - ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT); mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, runtime_profile_->name(), state->instance_mem_tracker())); expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false)); - + expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get())); + rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT); rows_returned_rate_ = runtime_profile()->AddDerivedCounter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_returned_counter_, - runtime_profile()->total_time_counter())); - - RETURN_IF_ERROR(Expr::Prepare(conjunct_ctxs_, state, row_desc(), expr_mem_tracker())); - AddExprCtxsToFree(conjunct_ctxs_); + runtime_profile()->total_time_counter())); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_, expr_mem_pool(), + &conjunct_evals_)); + DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size()); + AddEvaluatorsToFree(conjunct_evals_); for (int i = 0; i < children_.size(); ++i) { RETURN_IF_ERROR(children_[i]->Prepare(state)); } @@ -176,7 +177,8 @@ void ExecNode::Codegen(RuntimeState* state) { Status ExecNode::Open(RuntimeState* state) { RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN, state)); - return Expr::Open(conjunct_ctxs_, state); + DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size()); + return ScalarExprEvaluator::Open(conjunct_evals_, state); } Status ExecNode::Reset(RuntimeState* state) { @@ -197,7 +199,10 @@ void ExecNode::Close(RuntimeState* state) { for (int i = 0; i < children_.size(); ++i) { children_[i]->Close(state); } - Expr::Close(conjunct_ctxs_, state); + + ScalarExprEvaluator::Close(conjunct_evals_, state); + ScalarExpr::Close(conjuncts_); + if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll(); if (mem_tracker() != NULL && mem_tracker()->consumption() != 0) { LOG(WARNING) << "Query " << state->query_id() << " may have leaked memory." << endl @@ -393,7 +398,7 @@ string ExecNode::DebugString() const { } void ExecNode::DebugString(int indentation_level, stringstream* out) const { - *out << " conjuncts=" << Expr::DebugString(conjunct_ctxs_); + *out << " conjuncts=" << ScalarExpr::DebugString(conjuncts_); for (int i = 0; i < children_.size(); ++i) { *out << "\n"; children_[i]->DebugString(indentation_level + 1, out); @@ -443,27 +448,25 @@ Status ExecNode::ExecDebugAction(TExecNodePhase::type phase, RuntimeState* state return Status::OK(); } -bool ExecNode::EvalConjuncts(ExprContext* const* ctxs, int num_ctxs, TupleRow* row) { - for (int i = 0; i < num_ctxs; ++i) { - BooleanVal v = ctxs[i]->GetBooleanVal(row); - if (v.is_null || !v.val) return false; +bool ExecNode::EvalConjuncts( + ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row) { + for (int i = 0; i < num_conjuncts; ++i) { + if (!EvalPredicate(evals[i], row)) return false; } return true; } Status ExecNode::QueryMaintenance(RuntimeState* state) { - FreeLocalAllocations(); + ScalarExprEvaluator::FreeLocalAllocations(evals_to_free_); return state->CheckQueryState(); } -void ExecNode::AddExprCtxsToFree(const vector<ExprContext*>& ctxs) { - for (int i = 0; i < ctxs.size(); ++i) AddExprCtxToFree(ctxs[i]); +void ExecNode::AddEvaluatorToFree(ScalarExprEvaluator* eval) { + evals_to_free_.push_back(eval); } -void ExecNode::AddExprCtxsToFree(const SortExecExprs& sort_exec_exprs) { - AddExprCtxsToFree(sort_exec_exprs.sort_tuple_slot_expr_ctxs()); - AddExprCtxsToFree(sort_exec_exprs.lhs_ordering_expr_ctxs()); - AddExprCtxsToFree(sort_exec_exprs.rhs_ordering_expr_ctxs()); +void ExecNode::AddEvaluatorsToFree(const vector<ScalarExprEvaluator*>& evals) { + for (ScalarExprEvaluator* eval : evals) AddEvaluatorToFree(eval); } void ExecNode::AddCodegenDisabledMessage(RuntimeState* state) { @@ -478,15 +481,19 @@ bool ExecNode::IsNodeCodegenDisabled() const { return disable_codegen_; } -// Codegen for EvalConjuncts. The generated signature is -// For a node with two conjunct predicates -// define i1 @EvalConjuncts(%"class.impala::ExprContext"** %ctxs, i32 %num_ctxs, -// %"class.impala::TupleRow"* %row) #20 { +// Codegen for EvalConjuncts. The generated signature is the same as EvalConjuncts(). +// +// For a node with two conjunct predicates: +// +// define i1 @EvalConjuncts(%"class.impala::ScalarExprEvaluator"** %evals, i32 %num_evals, +// %"class.impala::TupleRow"* %row) #34 { // entry: -// %ctx_ptr = getelementptr %"class.impala::ExprContext"** %ctxs, i32 0 -// %ctx = load %"class.impala::ExprContext"** %ctx_ptr -// %result = call i16 @Eq_StringVal_StringValWrapper3( -// %"class.impala::ExprContext"* %ctx, %"class.impala::TupleRow"* %row) +// %eval_ptr = getelementptr inbounds %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %evals, i32 0 +// %eval = load %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %eval_ptr +// %result = call i16 @"impala::Operators::Eq_BigIntVal_BigIntValWrapper"( +// %"class.impala::ScalarExprEvaluator"* %eval, %"class.impala::TupleRow"* %row) // %is_null = trunc i16 %result to i1 // %0 = ashr i16 %result, 8 // %1 = trunc i16 %0 to i8 @@ -496,30 +503,32 @@ bool ExecNode::IsNodeCodegenDisabled() const { // br i1 %return_false, label %false, label %continue // // continue: ; preds = %entry -// %ctx_ptr2 = getelementptr %"class.impala::ExprContext"** %ctxs, i32 1 -// %ctx3 = load %"class.impala::ExprContext"** %ctx_ptr2 -// %result4 = call i16 @Gt_BigIntVal_BigIntValWrapper5( -// %"class.impala::ExprContext"* %ctx3, %"class.impala::TupleRow"* %row) -// %is_null5 = trunc i16 %result4 to i1 -// %2 = ashr i16 %result4, 8 -// %3 = trunc i16 %2 to i8 -// %val6 = trunc i8 %3 to i1 -// %is_false7 = xor i1 %val6, true -// %return_false8 = or i1 %is_null5, %is_false7 -// br i1 %return_false8, label %false, label %continue1 +// %eval_ptr2 = getelementptr inbounds %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %evals, i32 1 +// %eval3 = load %"class.impala::ScalarExprEvaluator"*, +// %"class.impala::ScalarExprEvaluator"** %eval_ptr2 +// %result4 = call i16 @"impala::Operators::Eq_StringVal_StringValWrapper"( +// %"class.impala::ScalarExprEvaluator"* %eval3, %"class.impala::TupleRow"* %row) +// %is_null5 = trunc i16 %result4 to i1 +// %2 = ashr i16 %result4, 8 +// %3 = trunc i16 %2 to i8 +// %val6 = trunc i8 %3 to i1 +// %is_false7 = xor i1 %val6, true +// %return_false8 = or i1 %is_null5, %is_false +// br i1 %return_false8, label %false, label %continue1 // // continue1: ; preds = %continue -// ret i1 true +// ret i1 true // // false: ; preds = %continue, %entry -// ret i1 false +// ret i1 false // } +// Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen, - const vector<ExprContext*>& conjunct_ctxs, Function** fn, const char* name) { - Function* conjunct_fns[conjunct_ctxs.size()]; - for (int i = 0; i < conjunct_ctxs.size(); ++i) { - RETURN_IF_ERROR( - conjunct_ctxs[i]->root()->GetCodegendComputeFn(codegen, &conjunct_fns[i])); + const vector<ScalarExpr*>& conjuncts, Function** fn, const char* name) { + Function* conjunct_fns[conjuncts.size()]; + for (int i = 0; i < conjuncts.size(); ++i) { + RETURN_IF_ERROR(conjuncts[i]->GetCodegendComputeFn(codegen, &conjunct_fns[i])); if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) { // Avoid bloating EvalConjuncts by inlining everything into it. codegen->SetNoInline(conjunct_fns[i]); @@ -527,43 +536,36 @@ Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen, } // Construct function signature to match - // bool EvalConjuncts(Expr** exprs, int num_exprs, TupleRow* row) - Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); - Type* expr_ctx_type = codegen->GetType(ExprContext::LLVM_CLASS_NAME); - - DCHECK(tuple_row_type != NULL); - DCHECK(expr_ctx_type != NULL); - - PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0); - PointerType* expr_ctx_ptr_type = PointerType::get(expr_ctx_type, 0); + // bool EvalConjuncts(ScalarExprEvaluator**, int, TupleRow*) + PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME); + Type* eval_type = codegen->GetType(ScalarExprEvaluator::LLVM_CLASS_NAME); LlvmCodeGen::FnPrototype prototype(codegen, name, codegen->GetType(TYPE_BOOLEAN)); prototype.AddArgument( - LlvmCodeGen::NamedVariable("ctxs", PointerType::get(expr_ctx_ptr_type, 0))); + LlvmCodeGen::NamedVariable("evals", codegen->GetPtrPtrType(eval_type))); prototype.AddArgument( - LlvmCodeGen::NamedVariable("num_ctxs", codegen->GetType(TYPE_INT))); + LlvmCodeGen::NamedVariable("num_evals", codegen->GetType(TYPE_INT))); prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); LlvmBuilder builder(codegen->context()); Value* args[3]; *fn = prototype.GeneratePrototype(&builder, args); - Value* ctxs_arg = args[0]; + Value* evals_arg = args[0]; Value* tuple_row_arg = args[2]; - if (conjunct_ctxs.size() > 0) { + if (conjuncts.size() > 0) { LLVMContext& context = codegen->context(); BasicBlock* false_block = BasicBlock::Create(context, "false", *fn); - for (int i = 0; i < conjunct_ctxs.size(); ++i) { + for (int i = 0; i < conjuncts.size(); ++i) { BasicBlock* true_block = BasicBlock::Create(context, "continue", *fn, false_block); - - Value* ctx_arg_ptr = builder.CreateConstGEP1_32(ctxs_arg, i, "ctx_ptr"); - Value* ctx_arg = builder.CreateLoad(ctx_arg_ptr, "ctx"); - Value* expr_args[] = { ctx_arg, tuple_row_arg }; + Value* eval_arg_ptr = builder.CreateInBoundsGEP(NULL, evals_arg, + codegen->GetIntConstant(TYPE_INT, i), "eval_ptr"); + Value* eval_arg = builder.CreateLoad(eval_arg_ptr, "eval"); // Call conjunct_fns[i] - CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped( - codegen, &builder, conjunct_ctxs[i]->root()->type(), conjunct_fns[i], expr_args, + CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder, + conjuncts[i]->type(), conjunct_fns[i], {eval_arg, tuple_row_arg}, "result"); // Return false if result.is_null || !result @@ -584,7 +586,7 @@ Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen, } // Avoid inlining EvalConjuncts into caller if it is large. - if (conjunct_ctxs.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { + if (conjuncts.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { codegen->SetNoInline(*fn); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/exec-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index c769be3..ceb3c49 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -23,7 +23,7 @@ #include <vector> #include "common/status.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr-evaluator.h" #include "gen-cpp/PlanNodes_types.h" #include "runtime/descriptors.h" // for RowDescriptor #include "util/blocking-queue.h" @@ -31,24 +31,25 @@ namespace impala { -class Expr; -class ExprContext; +class DataSink; +class MemPool; +class MemTracker; class ObjectPool; -class Counters; -class SortExecExprs; class RowBatch; class RuntimeState; +class ScalarExpr; +class SubplanNode; class TPlan; class TupleRow; -class DataSink; -class MemTracker; -class SubplanNode; class TDebugOptions; /// Superclass of all executor nodes. /// All subclasses need to make sure to check RuntimeState::is_cancelled() /// periodically in order to ensure timely termination after the cancellation /// flag gets set. +/// TODO: Move static state of ExecNode into PlanNode, of which there is one instance +/// per fragment. ExecNode contains only runtime state and there can be up to MT_DOP +/// instances of it per fragment. class ExecNode { public: /// Init conjuncts. @@ -144,15 +145,18 @@ class ExecNode { /// Collect all scan node types. void CollectScanNodes(std::vector<ExecNode*>* nodes); - /// Evaluate ExprContexts over row. Returns true if all exprs return true. - /// TODO: This doesn't use the vector<Expr*> signature because I haven't figured - /// out how to deal with declaring a templated std:vector type in IR - static bool EvalConjuncts(ExprContext* const* ctxs, int num_ctxs, TupleRow* row); + /// Evaluates the predicate in 'eval' over 'row' and returns the result. + static bool EvalPredicate(ScalarExprEvaluator* eval, TupleRow* row); + + /// Evaluate the conjuncts in 'evaluators' over 'row'. + /// Returns true if all exprs return true. + static bool EvalConjuncts( + ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row); /// Codegen EvalConjuncts(). Returns a non-OK status if the function couldn't be /// codegen'd. The codegen'd version uses inlined, codegen'd GetBooleanVal() functions. static Status CodegenEvalConjuncts(LlvmCodeGen* codegen, - const std::vector<ExprContext*>& conjunct_ctxs, llvm::Function** fn, + const std::vector<ScalarExpr*>& conjuncts, llvm::Function** fn, const char* name = "EvalConjuncts") WARN_UNUSED_RESULT; /// Returns a string representation in DFS order of the plan rooted at this. @@ -166,7 +170,12 @@ class ExecNode { /// out: Stream to accumulate debug string. virtual void DebugString(int indentation_level, std::stringstream* out) const; - const std::vector<ExprContext*>& conjunct_ctxs() const { return conjunct_ctxs_; } + const std::vector<ScalarExpr*>& conjuncts() const { return conjuncts_; } + + const std::vector<ScalarExprEvaluator*>& conjunct_evals() const { + return conjunct_evals_; + } + int id() const { return id_; } TPlanNodeType::type type() const { return type_; } const RowDescriptor& row_desc() const { return row_descriptor_; } @@ -184,6 +193,7 @@ class ExecNode { RuntimeProfile* runtime_profile() { return runtime_profile_.get(); } MemTracker* mem_tracker() { return mem_tracker_.get(); } MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); } + MemPool* expr_mem_pool() { return expr_mem_pool_.get(); } /// Return true if codegen was disabled by the planner for this ExecNode. Does not /// check to see if codegen was enabled for the enclosing fragment. @@ -245,10 +255,14 @@ class ExecNode { /// Unique within a single plan tree. int id_; - TPlanNodeType::type type_; ObjectPool* pool_; - std::vector<ExprContext*> conjunct_ctxs_; + + /// Conjuncts and their evaluators in this node. 'conjuncts_' live in the + /// query-state's object pool while the evaluators live in this exec node's + /// object pool. + std::vector<ScalarExpr*> conjuncts_; + std::vector<ScalarExprEvaluator*> conjunct_evals_; std::vector<ExecNode*> children_; RowDescriptor row_descriptor_; @@ -268,9 +282,13 @@ class ExecNode { /// Account for peak memory used by this node boost::scoped_ptr<MemTracker> mem_tracker_; - /// MemTracker that should be used for ExprContexts. + /// MemTracker used by 'expr_mem_pool_'. boost::scoped_ptr<MemTracker> expr_mem_tracker_; + /// MemPool for allocating data structures used by expression evaluators in this node. + /// Created in Prepare(). + boost::scoped_ptr<MemPool> expr_mem_pool_; + bool is_closed() const { return is_closed_; } /// Pointer to the containing SubplanNode or NULL if not inside a subplan. @@ -302,7 +320,7 @@ class ExecNode { Status ExecDebugAction( TExecNodePhase::type phase, RuntimeState* state) WARN_UNUSED_RESULT; - /// Frees any local allocations made by expr_ctxs_to_free_ and returns the result of + /// Frees any local allocations made by evals_to_free_ and returns the result of /// state->CheckQueryState(). Nodes should call this periodically, e.g. once per input /// row batch. This should not be called outside the main execution thread. // @@ -311,26 +329,29 @@ class ExecNode { /// ExecNode::QueryMaintenance(). virtual Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT; - /// Add an ExprContext to have its local allocations freed by QueryMaintenance(). + /// Add an expr evaluator to have its local allocations freed by QueryMaintenance(). /// Exprs that are evaluated in the main execution thread should be added. Exprs /// evaluated in a separate thread are generally not safe to add, since a local /// allocation may be freed while it's being used. Rather than using this mechanism, - /// threads should call FreeLocalAllocations() on local ExprContexts periodically. - void AddExprCtxToFree(ExprContext* ctx) { expr_ctxs_to_free_.push_back(ctx); } - void AddExprCtxsToFree(const std::vector<ExprContext*>& ctxs); - void AddExprCtxsToFree(const SortExecExprs& sort_exec_exprs); - - /// Free any local allocations made by expr_ctxs_to_free_. - void FreeLocalAllocations() { ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); } + /// threads should call FreeLocalAllocations() on local evaluators periodically. + void AddEvaluatorToFree(ScalarExprEvaluator* eval); + void AddEvaluatorsToFree(const std::vector<ScalarExprEvaluator*>& evals); private: /// Set in ExecNode::Close(). Used to make Close() idempotent. This is not protected /// by a lock, it assumes all calls to Close() are made by the same thread. bool is_closed_; - /// Expr contexts whose local allocations are safe to free in the main execution thread. - std::vector<ExprContext*> expr_ctxs_to_free_; + /// Expr evaluators whose local allocations are safe to free in the main execution + /// thread. + std::vector<ScalarExprEvaluator*> evals_to_free_; }; +inline bool ExecNode::EvalPredicate(ScalarExprEvaluator* eval, TupleRow* row) { + BooleanVal v = eval->GetBooleanVal(row); + if (v.is_null || !v.val) return false; + return true; +} + } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/filter-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc index 5c33ed4..0eee704 100644 --- a/be/src/exec/filter-context.cc +++ b/be/src/exec/filter-context.cc @@ -67,22 +67,23 @@ void FilterStats::RegisterCounterGroup(const string& key) { counters[key] = counter; } -Status FilterContext::CloneFrom(const FilterContext& from, RuntimeState* state) { +Status FilterContext::CloneFrom(const FilterContext& from, ObjectPool* pool, + RuntimeState* state, MemPool* mem_pool) { filter = from.filter; stats = from.stats; - return from.expr_ctx->Clone(state, &expr_ctx); + return from.expr_eval->Clone(pool, state, mem_pool, &expr_eval); } bool FilterContext::Eval(TupleRow* row) const noexcept { - void* e = expr_ctx->GetValue(row); - return filter->Eval(e, expr_ctx->root()->type()); + void* val = expr_eval->GetValue(row); + return filter->Eval(val, expr_eval->root().type()); } void FilterContext::Insert(TupleRow* row) const noexcept { if (local_bloom_filter == NULL) return; - void* e = expr_ctx->GetValue(row); + void* val = expr_eval->GetValue(row); uint32_t filter_hash = RawValue::GetHashValue( - e, expr_ctx->root()->type(), RuntimeFilterBank::DefaultHashSeed()); + val, expr_eval->root().type(), RuntimeFilterBank::DefaultHashSeed()); local_bloom_filter->Insert(filter_hash); } @@ -97,11 +98,11 @@ void FilterContext::Insert(TupleRow* row) const noexcept { // %"class.impala::TupleRow"* %row) #34 { // entry: // %0 = alloca i16 -// %expr_ctx_ptr = getelementptr inbounds %"struct.impala::FilterContext", +// %expr_eval_ptr = getelementptr inbounds %"struct.impala::FilterContext", // %"struct.impala::FilterContext"* %this, i32 0, i32 0 -// %expr_ctx_arg = load %"class.impala::ExprContext"*, -// %"class.impala::ExprContext"** %expr_ctx_ptr -// %result = call i32 @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx_arg, +// %expr_eval_arg = load %"class.impala::ExprContext"*, +// %"class.impala::ExprContext"** %expr_eval_ptr +// %result = call i32 @GetSlotRef(%"class.impala::ExprContext"* %expr_eval_arg, // %"class.impala::TupleRow"* %row) // %is_null1 = trunc i32 %result to i1 // br i1 %is_null1, label %is_null, label %not_null @@ -127,11 +128,12 @@ void FilterContext::Insert(TupleRow* row) const noexcept { // %"struct.impala::ColumnType"* @expr_type_arg) // ret i1 %passed_filter // } -Status FilterContext::CodegenEval(LlvmCodeGen* codegen, Function** fn) const { +Status FilterContext::CodegenEval(LlvmCodeGen* codegen, ScalarExpr* filter_expr, + Function** fn) { LLVMContext& context = codegen->context(); LlvmBuilder builder(context); - *fn = NULL; + *fn = nullptr; PointerType* this_type = codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME); PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME); LlvmCodeGen::FnPrototype prototype(codegen, "FilterContextEval", @@ -149,24 +151,25 @@ Status FilterContext::CodegenEval(LlvmCodeGen* codegen, Function** fn) const { BasicBlock* eval_filter_block = BasicBlock::Create(context, "eval_filter", eval_filter_fn); - Expr* expr = expr_ctx->root(); Function* compute_fn; - RETURN_IF_ERROR(expr->GetCodegendComputeFn(codegen, &compute_fn)); - DCHECK(compute_fn != NULL); + RETURN_IF_ERROR(filter_expr->GetCodegendComputeFn(codegen, &compute_fn)); + DCHECK(compute_fn != nullptr); // The function for checking against the bloom filter for match. Function* runtime_filter_fn = codegen->GetFunction(IRFunction::RUNTIME_FILTER_EVAL, false); - DCHECK(runtime_filter_fn != NULL); + DCHECK(runtime_filter_fn != nullptr); - // Load 'expr_ctx' from 'this_arg' FilterContext object. - Value* expr_ctx_ptr = builder.CreateStructGEP(NULL, this_arg, 0, "expr_ctx_ptr"); - Value* expr_ctx_arg = builder.CreateLoad(expr_ctx_ptr, "expr_ctx_arg"); + // Load 'expr_eval' from 'this_arg' FilterContext object. + Value* expr_eval_ptr = + builder.CreateStructGEP(nullptr, this_arg, 0, "expr_eval_ptr"); + Value* expr_eval_arg = + builder.CreateLoad(expr_eval_ptr, "expr_eval_arg"); // Evaluate the row against the filter's expression. - Value* compute_fn_args[] = {expr_ctx_arg, row_arg}; + Value* compute_fn_args[] = {expr_eval_arg, row_arg}; CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder, - expr->type(), compute_fn, compute_fn_args, "result"); + filter_expr->type(), compute_fn, compute_fn_args, "result"); // Check if the result is NULL Value* is_null = result.GetIsNull(); @@ -192,11 +195,11 @@ Status FilterContext::CodegenEval(LlvmCodeGen* codegen, Function** fn) const { // Create a global constant of the filter expression's ColumnType. It needs to be a // constant for constant propagation and dead code elimination in 'runtime_filter_fn'. Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME); - Constant* expr_type_arg = codegen->ConstantToGVPtr(col_type, expr->type().ToIR(codegen), - "expr_type_arg"); + Constant* expr_type_arg = codegen->ConstantToGVPtr(col_type, + filter_expr->type().ToIR(codegen), "expr_type_arg"); // Load 'filter' from 'this_arg' FilterContext object. - Value* filter_ptr = builder.CreateStructGEP(NULL, this_arg, 1, "filter_ptr"); + Value* filter_ptr = builder.CreateStructGEP(nullptr, this_arg, 1, "filter_ptr"); Value* filter_arg = builder.CreateLoad(filter_ptr, "filter_arg"); Value* run_filter_args[] = {filter_arg, val_ptr_phi, expr_type_arg}; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/filter-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h index 37d139d..fa95b91 100644 --- a/be/src/exec/filter-context.h +++ b/be/src/exec/filter-context.h @@ -20,9 +20,7 @@ #define IMPALA_EXEC_FILTER_CONTEXT_H #include <boost/unordered_map.hpp> -#include <gutil/strings/substitute.h> - -#include "exprs/expr-context.h" +#include "exprs/scalar-expr-evaluator.h" #include "util/runtime-profile.h" namespace impala { @@ -30,6 +28,7 @@ namespace impala { class BloomFilter; class LlvmCodeGen; class RuntimeFilter; +class ScalarExpr; class TupleRow; /// Container struct for per-filter statistics, with statistics for each granularity of @@ -80,45 +79,43 @@ class FilterStats { /// FilterContext contains all metadata for a single runtime filter, and allows the filter /// to be applied in the context of a single thread. struct FilterContext { - /// Expression which produces a value to test against the runtime filter. - /// This field is referenced in generated code so if the order of it changes - /// inside this struct, please update CodegenEval(). - ExprContext* expr_ctx; + /// Evaluator for 'expr'. This field is referenced in generated code so if the order + /// of it changes inside this struct, please update CodegenEval(). + ScalarExprEvaluator* expr_eval = nullptr; /// Cache of filter from runtime filter bank. /// The field is referenced in generated code so if the order of it changes /// inside this struct, please update CodegenEval(). - const RuntimeFilter* filter; + const RuntimeFilter* filter = nullptr; /// Statistics for this filter, owned by object pool. - FilterStats* stats; + FilterStats* stats = nullptr; /// Working copy of local bloom filter - BloomFilter* local_bloom_filter; + BloomFilter* local_bloom_filter = nullptr; /// Struct name in LLVM IR. static const char* LLVM_CLASS_NAME; /// Clones this FilterContext for use in a multi-threaded context (i.e. by scanner /// threads). - Status CloneFrom(const FilterContext& from, RuntimeState* state); + Status CloneFrom(const FilterContext& from, ObjectPool* pool, RuntimeState* state, + MemPool* mem_pool); - /// Evaluates 'row' on the expression in 'expr_ctx' with the resulting value being - /// checked against runtime filter 'filter' for matches. Returns true if 'row' finds + /// Evaluates 'row' with 'expr_eval' with the resulting value being checked + /// against runtime filter 'filter' for matches. Returns true if 'row' finds /// a match in 'filter'. Returns false otherwise. bool Eval(TupleRow* row) const noexcept; - /// Evaluates 'row' on the expression in 'expr_ctx' and hashes the resulting value. + /// Evaluates 'row' with 'expr_eval' and hashes the resulting value. /// The hash value is then used for setting some bits in 'local_bloom_filter'. void Insert(TupleRow* row) const noexcept; - /// Codegen Eval() by codegen'ing the expression evaluations and replacing the type + /// Codegen Eval() by codegen'ing the expression 'filter_expr' and replacing the type /// argument to RuntimeFilter::Eval() with a constant. On success, 'fn' is set to /// the generated function. On failure, an error status is returned. - Status CodegenEval(LlvmCodeGen* codegen, llvm::Function** fn) const; - - FilterContext() - : expr_ctx(NULL), filter(NULL), local_bloom_filter(NULL) { } + static Status CodegenEval(LlvmCodeGen* codegen, ScalarExpr* filter_expr, + llvm::Function** fn) WARN_UNUSED_RESULT; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-join-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-join-node-ir.cc b/be/src/exec/hash-join-node-ir.cc index 7ef7ca4..25aa556 100644 --- a/be/src/exec/hash-join-node-ir.cc +++ b/be/src/exec/hash-join-node-ir.cc @@ -18,6 +18,7 @@ #include "codegen/impala-ir.h" #include "exec/hash-join-node.h" #include "exec/old-hash-table.inline.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/row-batch.h" #include "common/names.h" @@ -34,8 +35,8 @@ using namespace impala; // TODO: explicitly set the calling convention? // TODO: investigate using fastcc for all codegen internal functions? bool IR_NO_INLINE EvalOtherJoinConjuncts2( - ExprContext* const* ctxs, int num_ctxs, TupleRow* row) { - return ExecNode::EvalConjuncts(ctxs, num_ctxs, row); + ScalarExprEvaluator* const* evals, int num_evals, TupleRow* row) { + return ExecNode::EvalConjuncts(evals, num_evals, row); } // CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by @@ -52,11 +53,14 @@ int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch, int rows_returned = 0; int probe_rows = probe_batch->num_rows(); - ExprContext* const* other_conjunct_ctxs = &other_join_conjunct_ctxs_[0]; - const int num_other_conjunct_ctxs = other_join_conjunct_ctxs_.size(); + ScalarExprEvaluator* const* other_conjunct_evals = + other_join_conjunct_evals_.data(); + const int num_other_conjuncts = other_join_conjuncts_.size(); + DCHECK_EQ(num_other_conjuncts, other_join_conjunct_evals_.size()); - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - const int num_conjunct_ctxs = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data(); + const int num_conjuncts = conjuncts_.size(); + DCHECK_EQ(num_conjuncts, conjunct_evals_.size()); while (true) { // Create output row for each matching build row @@ -67,10 +71,10 @@ int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch, if (join_op_ == TJoinOp::LEFT_SEMI_JOIN) { // Evaluate the non-equi-join conjuncts against a temp row assembled from all // build and probe tuples. - if (num_other_conjunct_ctxs > 0) { + if (num_other_conjuncts > 0) { CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row); - if (!EvalOtherJoinConjuncts2(other_conjunct_ctxs, num_other_conjunct_ctxs, - semi_join_staging_row_)) { + if (!EvalOtherJoinConjuncts2(other_conjunct_evals, num_other_conjuncts, + semi_join_staging_row_)) { continue; } } @@ -78,13 +82,13 @@ int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch, } else { CreateOutputRow(out_row, current_probe_row_, matched_build_row); if (!EvalOtherJoinConjuncts2( - other_conjunct_ctxs, num_other_conjunct_ctxs, out_row)) { + other_conjunct_evals, num_other_conjuncts, out_row)) { continue; } } matched_probe_ = true; - if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { + if (EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) { ++rows_returned; // Filled up out batch or hit limit if (UNLIKELY(rows_returned == max_added_rows)) goto end; @@ -104,7 +108,7 @@ int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch, if (!matched_probe_ && match_all_probe_) { CreateOutputRow(out_row, current_probe_row_, NULL); matched_probe_ = true; - if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { + if (EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) { ++rows_returned; if (UNLIKELY(rows_returned == max_added_rows)) goto end; // Advance to next out row
