http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-hash-join-builder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h index c123a9b..e8dd524 100644 --- a/be/src/exec/partitioned-hash-join-builder.h +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -277,9 +277,6 @@ class PhjBuilder : public DataSink { virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, const TDataSink& tsink, RuntimeState* state) override; - /// Free local allocations made from expr evaluators during hash table construction. - void FreeLocalAllocations() const; - private: /// Create and initialize a set of hash partitions for partitioning level 'level'. /// The previous hash partitions must have been cleared with ClearHashPartitions(). @@ -388,7 +385,7 @@ class PhjBuilder : public DataSink { const RowDescriptor* probe_row_desc_; /// Pool for objects with same lifetime as builder. - ObjectPool pool_; + ObjectPool obj_pool_; /// Client to the buffer pool, used to allocate build partition buffers and hash tables. /// When probing, the spilling algorithm keeps some build partitions in memory while
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 4faa12e..94b49b3 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -119,13 +119,22 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { runtime_profile()->PrependChild(builder_->profile()); RETURN_IF_ERROR(ScalarExprEvaluator::Create(other_join_conjuncts_, state, pool_, - expr_mem_pool(), &other_join_conjunct_evals_)); - AddEvaluatorsToFree(other_join_conjunct_evals_); + expr_perm_pool(), expr_results_pool(), &other_join_conjunct_evals_)); + probe_expr_results_pool_.reset(new MemPool(mem_tracker())); + + // We have to carefully set up expression evaluators in the HashTableCtx to use + // MemPools with appropriate lifetime. The values of build exprs are only used + // temporarily while processing each build batch or when processing a probe row + // so can be stored in 'expr_results_pool_', which is freed during + // QueryMaintenance(). Values of probe exprs may need to live longer until the + // cache is reset so are stored in 'probe_expr_results_pool_', which is cleared + // manually at the appropriate time. RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, probe_exprs_, builder_->HashTableStoresNulls(), builder_->is_not_distinct_from(), state->fragment_hash_seed(), MAX_PARTITION_DEPTH, - child(1)->row_desc()->tuple_descriptors().size(), expr_mem_pool(), &ht_ctx_)); + child(1)->row_desc()->tuple_descriptors().size(), expr_perm_pool(), + expr_results_pool(), probe_expr_results_pool_.get(), &ht_ctx_)); if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime"); } @@ -163,14 +172,14 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) { RETURN_IF_ERROR(ht_ctx_->Open(state)); RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, state)); - // Check for errors and free local allocations before opening children. + // Check for errors and free expr result allocations before opening children. RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); - // The prepare functions of probe expressions may have done local allocations implicitly - // (e.g. calling UdfBuiltins::Lower()). The probe expressions' local allocations need to - // be freed now as they don't get freed again till probing. Other exprs' local allocations - // are freed in ExecNode::FreeLocalAllocations(). - ht_ctx_->FreeProbeLocalAllocations(); + // The prepare functions of probe expressions may have made result allocations implicitly + // (e.g. calling UdfBuiltins::Lower()). The probe expressions' expr result allocations need to + // be cleared now as they don't get cleared again till probing. Other exprs' result allocations + // are cleared in QueryMaintenance(). + probe_expr_results_pool_->Clear(); RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get())); RETURN_IF_ERROR(PrepareForProbe()); @@ -197,15 +206,6 @@ Status PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) { return Status::OK(); } -Status PartitionedHashJoinNode::QueryMaintenance(RuntimeState* state) { - // Build expressions may be evaluated during probing, so must be freed. - // Probe side expr is not included in QueryMaintenance(). We cache the probe expression - // values in ExprValuesCache. Local allocations need to survive until the cache is reset - // so we need to manually free probe expr local allocations. - if (ht_ctx_.get() != nullptr) ht_ctx_->FreeBuildLocalAllocations(); - return ExecNode::QueryMaintenance(state); -} - Status PartitionedHashJoinNode::Reset(RuntimeState* state) { if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { null_probe_output_idx_ = -1; @@ -260,6 +260,7 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) { ScalarExpr::Close(build_exprs_); ScalarExpr::Close(probe_exprs_); ScalarExpr::Close(other_join_conjuncts_); + if (probe_expr_results_pool_ != nullptr) probe_expr_results_pool_->FreeAll(); BlockingJoinNode::Close(state); } @@ -591,10 +592,10 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch)); } } - // Free local allocations of the probe side expressions only after ExprValuesCache - // has been reset. + // Free expr result allocations of the probe side expressions only after + // ExprValuesCache has been reset. DCHECK(ht_ctx_->expr_values_cache()->AtEnd()); - ht_ctx_->FreeProbeLocalAllocations(); + probe_expr_results_pool_->Clear(); // We want to return as soon as we have attached a tuple stream to the out_batch // (before preparing a new partition). The attached tuple stream will be recycled http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index cb51b9d..572be34 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -111,7 +111,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode { virtual void Close(RuntimeState* state) override; protected: - virtual Status QueryMaintenance(RuntimeState* state) override; virtual void AddToDebugString( int indentation_level, std::stringstream* out) const override; @@ -423,6 +422,11 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// and probing of the hash tables. boost::scoped_ptr<HashTableCtx> ht_ctx_; + /// MemPool that stores allocations that hold results from evaluation of probe + /// exprs by 'ht_ctx_'. Cached probe expression values may reference memory in this + /// pool. + boost::scoped_ptr<MemPool> probe_expr_results_pool_; + /// The iterator that corresponds to the look up of current_probe_row_. HashTable::Iterator hash_tbl_iterator_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/plan-root-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc index 2e8408a..9c20ff3 100644 --- a/be/src/exec/plan-root-sink.cc +++ b/be/src/exec/plan-root-sink.cc @@ -97,9 +97,10 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { RETURN_IF_ERROR(results_->AddOneRow(result_row, scales)); ++current_batch_row; } + // Prevent expr result allocations from accumulating. + expr_results_pool_->Clear(); // Signal the consumer. results_ = nullptr; - ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_); consumer_cv_.notify_all(); } return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc index 94f73c0..eba7727 100644 --- a/be/src/exec/scan-node.cc +++ b/be/src/exec/scan-node.cc @@ -115,8 +115,7 @@ Status ScanNode::Prepare(RuntimeState* state) { DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size()); for (int i = 0; i < filter_exprs_.size(); ++i) { RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, pool_, - expr_mem_pool(), &filter_ctxs_[i].expr_eval)); - AddEvaluatorToFree(filter_ctxs_[i].expr_eval); + expr_perm_pool(), expr_results_pool(), &filter_ctxs_[i].expr_eval)); } return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 16a09e4..3ed8b4a 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -44,12 +44,13 @@ static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0; ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node, HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range, - const vector<FilterContext>& filter_ctxs) + const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool) : state_(state), scan_node_(scan_node), partition_desc_(partition_desc), num_completed_io_buffers_(0), - filter_ctxs_(filter_ctxs) { + filter_ctxs_(filter_ctxs), + expr_results_pool_(expr_results_pool) { AddStream(scan_range); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index bd5623c..c13d26f 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -65,7 +65,8 @@ class ScannerContext { /// get pushed to) and the scan range to process. /// This context starts with 1 stream. ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*, - DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs); + DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs, + MemPool* expr_results_pool); /// Destructor verifies that all stream objects have been released. ~ScannerContext(); @@ -311,7 +312,7 @@ class ScannerContext { int num_completed_io_buffers() const { return num_completed_io_buffers_; } HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; } const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; } - + MemPool* expr_results_pool() const { return expr_results_pool_; } private: friend class Stream; @@ -329,6 +330,18 @@ class ScannerContext { /// Filter contexts for all filters applicable to this scan. Memory attached to the /// context is owned by the scan node. std::vector<FilterContext> filter_ctxs_; + + /// MemPool used for allocations that hold results of expression evaluation in the + /// scanner and 'filter_ctxs_'. Must be thread-local since MemPool is not thread-safe. + /// Owned by ScannerThread() in the multi-threaded scan node and by the ExecNode in the + /// single-threaded scan node implementation. + /// + /// The scanner is responsible for clearing the MemPool periodically after expression + /// evaluation to prevent memory from accumulating. + /// + /// TODO: IMPALA-6015: it should be possible to simplify the lifecycle of this pool and + /// filter_ctxs_ once the multithreaded scan node is removed. + MemPool* const expr_results_pool_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/sort-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc index 33b3acb..9ab1435 100644 --- a/be/src/exec/sort-node.cc +++ b/be/src/exec/sort-node.cc @@ -52,12 +52,11 @@ Status SortNode::Init(const TPlanNode& tnode, RuntimeState* state) { Status SortNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Prepare(state)); - less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); sorter_.reset( - new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_, mem_tracker(), - &buffer_pool_client_, resource_profile_.spillable_buffer_size, - runtime_profile(), state, id(), true)); - RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool())); + new Sorter(ordering_exprs_, is_asc_order_, nulls_first_, sort_tuple_exprs_, + &row_descriptor_, mem_tracker(), &buffer_pool_client_, + resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), true)); + RETURN_IF_ERROR(sorter_->Prepare(pool_)); DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation()); AddCodegenDisabledMessage(state); return Status::OK(); @@ -67,7 +66,7 @@ void SortNode::Codegen(RuntimeState* state) { DCHECK(state->ShouldCodegen()); ExecNode::Codegen(state); if (IsNodeCodegenDisabled()) return; - Status codegen_status = less_than_->Codegen(state); + Status codegen_status = sorter_->Codegen(state); runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status); } @@ -80,7 +79,6 @@ Status SortNode::Open(RuntimeState* state) { if (!buffer_pool_client_.is_registered()) { RETURN_IF_ERROR(ClaimBufferReservation(state)); } - RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool())); RETURN_IF_ERROR(sorter_->Open()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); @@ -153,7 +151,6 @@ Status SortNode::Reset(RuntimeState* state) { void SortNode::Close(RuntimeState* state) { if (is_closed()) return; - if (less_than_.get() != nullptr) less_than_->Close(state); if (sorter_ != nullptr) sorter_->Close(state); sorter_.reset(); ScalarExpr::Close(ordering_exprs_); @@ -161,11 +158,6 @@ void SortNode::Close(RuntimeState* state) { ExecNode::Close(state); } -Status SortNode::QueryMaintenance(RuntimeState* state) { - sorter_->FreeLocalAllocations(); - return ExecNode::QueryMaintenance(state); -} - void SortNode::DebugString(int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); *out << "SortNode(" << ScalarExpr::DebugString(ordering_exprs_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/sort-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h index d6eef25..8075b8e 100644 --- a/be/src/exec/sort-node.h +++ b/be/src/exec/sort-node.h @@ -46,7 +46,6 @@ class SortNode : public ExecNode { virtual void Close(RuntimeState* state); protected: - virtual Status QueryMaintenance(RuntimeState* state); virtual void DebugString(int indentation_level, std::stringstream* out) const; private: @@ -56,9 +55,6 @@ class SortNode : public ExecNode { /// Number of rows to skip. int64_t offset_; - /// Compares tuples according to 'ordering_exprs'. - boost::scoped_ptr<TupleRowComparator> less_than_; - /// Expressions and parameters used for tuple comparison. std::vector<ScalarExpr*> ordering_exprs_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/topn-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc index 5bba89d..6b4946f 100644 --- a/be/src/exec/topn-node.cc +++ b/be/src/exec/topn-node.cc @@ -75,8 +75,7 @@ Status TopNNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Prepare(state)); tuple_pool_.reset(new MemPool(mem_tracker())); RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_tuple_exprs_, state, pool_, - expr_mem_pool(), &output_tuple_expr_evals_)); - AddEvaluatorsToFree(output_tuple_expr_evals_); + expr_perm_pool(), expr_results_pool(), &output_tuple_expr_evals_)); tuple_row_less_than_.reset( new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0]; @@ -139,7 +138,8 @@ void TopNNode::Codegen(RuntimeState* state) { Status TopNNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); - RETURN_IF_ERROR(tuple_row_less_than_->Open(pool_, state, expr_mem_pool())); + RETURN_IF_ERROR( + tuple_row_less_than_->Open(pool_, state, expr_perm_pool(), expr_results_pool())); RETURN_IF_ERROR(ScalarExprEvaluator::Open(output_tuple_expr_evals_, state)); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); @@ -231,11 +231,6 @@ void TopNNode::Close(RuntimeState* state) { ExecNode::Close(state); } -Status TopNNode::QueryMaintenance(RuntimeState* state) { - tuple_row_less_than_->FreeLocalAllocations(); - return ExecNode::QueryMaintenance(state); -} - // Reverse the order of the tuples in the priority queue void TopNNode::PrepareForOutput() { sorted_top_n_.resize(priority_queue_.size()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/topn-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h index b3cf1c0..e12cfc3 100644 --- a/be/src/exec/topn-node.h +++ b/be/src/exec/topn-node.h @@ -51,7 +51,6 @@ class TopNNode : public ExecNode { virtual void Close(RuntimeState* state); protected: - virtual Status QueryMaintenance(RuntimeState* state); virtual void DebugString(int indentation_level, std::stringstream* out) const; private: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/union-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc index b3681be..f2acc7a 100644 --- a/be/src/exec/union-node.cc +++ b/be/src/exec/union-node.cc @@ -80,8 +80,7 @@ Status UnionNode::Prepare(RuntimeState* state) { for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) { vector<ScalarExprEvaluator*> const_expr_evals; RETURN_IF_ERROR(ScalarExprEvaluator::Create(const_exprs, state, pool_, - expr_mem_pool(), &const_expr_evals)); - AddEvaluatorsToFree(const_expr_evals); + expr_perm_pool(), expr_results_pool(), &const_expr_evals)); const_expr_evals_lists_.push_back(const_expr_evals); } @@ -89,8 +88,7 @@ Status UnionNode::Prepare(RuntimeState* state) { for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) { vector<ScalarExprEvaluator*> child_expr_evals; RETURN_IF_ERROR(ScalarExprEvaluator::Create(child_exprs, state, pool_, - expr_mem_pool(), &child_expr_evals)); - AddEvaluatorsToFree(child_expr_evals); + expr_perm_pool(), expr_results_pool(), &child_expr_evals)); child_expr_evals_lists_.push_back(child_expr_evals); } return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/unnest-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc index 2c02ff7..ef09731 100644 --- a/be/src/exec/unnest-node.cc +++ b/be/src/exec/unnest-node.cc @@ -84,7 +84,7 @@ Status UnnestNode::Prepare(RuntimeState* state) { item_byte_size_ = item_tuple_desc->byte_size(); RETURN_IF_ERROR(ScalarExprEvaluator::Create(*coll_expr_, state, pool_, - expr_mem_pool(), &coll_expr_eval_)); + expr_perm_pool(), expr_results_pool(), &coll_expr_eval_)); // Set the coll_slot_desc_ and the corresponding tuple index used for manually // evaluating the collection SlotRef and for projection. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/agg-fn-evaluator.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc index f5a6098..1af76f4 100644 --- a/be/src/exprs/agg-fn-evaluator.cc +++ b/be/src/exprs/agg-fn-evaluator.cc @@ -71,10 +71,9 @@ typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&); const char* AggFnEvaluator::LLVM_CLASS_NAME = "class.impala::AggFnEvaluator"; -AggFnEvaluator::AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool is_clone) +AggFnEvaluator::AggFnEvaluator(const AggFn& agg_fn, bool is_clone) : is_clone_(is_clone), - agg_fn_(agg_fn), - mem_pool_(mem_pool) { + agg_fn_(agg_fn) { } AggFnEvaluator::~AggFnEvaluator() { @@ -90,26 +89,27 @@ const ColumnType& AggFnEvaluator::intermediate_type() const { } Status AggFnEvaluator::Create(const AggFn& agg_fn, RuntimeState* state, ObjectPool* pool, - MemPool* mem_pool, AggFnEvaluator** result) { + MemPool* expr_perm_pool, MemPool* expr_results_pool, AggFnEvaluator** result) { *result = nullptr; // Create a new AggFn evaluator. - AggFnEvaluator* agg_fn_eval = pool->Add(new AggFnEvaluator(agg_fn, mem_pool, false)); - agg_fn_eval->agg_fn_ctx_.reset(FunctionContextImpl::CreateContext(state, mem_pool, - agg_fn.GetIntermediateTypeDesc(), agg_fn.GetOutputTypeDesc(), + AggFnEvaluator* agg_fn_eval = pool->Add(new AggFnEvaluator(agg_fn, false)); + agg_fn_eval->agg_fn_ctx_.reset(FunctionContextImpl::CreateContext(state, expr_perm_pool, + expr_results_pool, agg_fn.GetIntermediateTypeDesc(), agg_fn.GetOutputTypeDesc(), agg_fn.arg_type_descs())); Status status; // Create the evaluators for the input expressions. for (const ScalarExpr* input_expr : agg_fn.children()) { ScalarExprEvaluator* input_eval; - status = ScalarExprEvaluator::Create(*input_expr, state, pool, mem_pool, &input_eval); + status = ScalarExprEvaluator::Create( + *input_expr, state, pool, expr_perm_pool, expr_results_pool, &input_eval); if (UNLIKELY(!status.ok())) goto cleanup; agg_fn_eval->input_evals_.push_back(input_eval); DCHECK(&input_eval->root() == input_expr); AnyVal* staging_input_val; - status = AllocateAnyVal(state, mem_pool, input_expr->type(), + status = AllocateAnyVal(state, expr_perm_pool, input_expr->type(), "Could not allocate aggregate expression input value", &staging_input_val); agg_fn_eval->staging_input_vals_.push_back(staging_input_val); if (UNLIKELY(!status.ok())) goto cleanup; @@ -117,11 +117,11 @@ Status AggFnEvaluator::Create(const AggFn& agg_fn, RuntimeState* state, ObjectPo DCHECK_EQ(agg_fn.GetNumChildren(), agg_fn_eval->input_evals_.size()); DCHECK_EQ(agg_fn_eval->staging_input_vals_.size(), agg_fn_eval->input_evals_.size()); - status = AllocateAnyVal(state, mem_pool, agg_fn.intermediate_type(), + status = AllocateAnyVal(state, expr_perm_pool, agg_fn.intermediate_type(), "Could not allocate aggregate expression intermediate value", &(agg_fn_eval->staging_intermediate_val_)); if (UNLIKELY(!status.ok())) goto cleanup; - status = AllocateAnyVal(state, mem_pool, agg_fn.intermediate_type(), + status = AllocateAnyVal(state, expr_perm_pool, agg_fn.intermediate_type(), "Could not allocate aggregate expression merge input value", &(agg_fn_eval->staging_merge_input_val_)); if (UNLIKELY(!status.ok())) goto cleanup; @@ -140,10 +140,12 @@ cleanup: } Status AggFnEvaluator::Create(const vector<AggFn*>& agg_fns, RuntimeState* state, - ObjectPool* pool, MemPool* mem_pool, vector<AggFnEvaluator*>* evals) { + ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool, + vector<AggFnEvaluator*>* evals) { for (const AggFn* agg_fn : agg_fns) { AggFnEvaluator* agg_fn_eval; - RETURN_IF_ERROR(AggFnEvaluator::Create(*agg_fn, state, pool, mem_pool, &agg_fn_eval)); + RETURN_IF_ERROR(AggFnEvaluator::Create(*agg_fn, state, pool, expr_perm_pool, + expr_results_pool, &agg_fn_eval)); evals->push_back(agg_fn_eval); } return Status::OK(); @@ -176,7 +178,6 @@ void AggFnEvaluator::Close(RuntimeState* state) { if (closed_) return; closed_ = true; if (!is_clone_) ScalarExprEvaluator::Close(input_evals_, state); - FreeLocalAllocations(); agg_fn_ctx_->impl()->Close(); agg_fn_ctx_.reset(); input_evals_.clear(); @@ -494,11 +495,12 @@ void AggFnEvaluator::SerializeOrFinalize(Tuple* src, } } -void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool, - AggFnEvaluator** cloned_eval) const { +void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool, + MemPool* expr_results_pool, AggFnEvaluator** cloned_eval) const { DCHECK(opened_); - *cloned_eval = pool->Add(new AggFnEvaluator(agg_fn_, mem_pool, true)); - (*cloned_eval)->agg_fn_ctx_.reset(agg_fn_ctx_->impl()->Clone(mem_pool)); + *cloned_eval = pool->Add(new AggFnEvaluator(agg_fn_, true)); + (*cloned_eval)->agg_fn_ctx_.reset( + agg_fn_ctx_->impl()->Clone(expr_perm_pool, expr_results_pool)); DCHECK_EQ((*cloned_eval)->input_evals_.size(), 0); (*cloned_eval)->input_evals_ = input_evals_; (*cloned_eval)->staging_input_vals_ = staging_input_vals_; @@ -507,21 +509,22 @@ void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool, (*cloned_eval)->opened_ = true; } -void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool, - const vector<AggFnEvaluator*>& evals, +void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool, + MemPool* expr_results_pool, const vector<AggFnEvaluator*>& evals, vector<AggFnEvaluator*>* cloned_evals) { for (const AggFnEvaluator* eval : evals) { AggFnEvaluator* cloned_eval; - eval->ShallowClone(pool, mem_pool, &cloned_eval); + eval->ShallowClone(pool, expr_perm_pool, expr_results_pool, &cloned_eval); cloned_evals->push_back(cloned_eval); } } -void AggFnEvaluator::FreeLocalAllocations() { - ScalarExprEvaluator::FreeLocalAllocations(input_evals_); - agg_fn_ctx_->impl()->FreeLocalAllocations(); -} - -void AggFnEvaluator::FreeLocalAllocations(const vector<AggFnEvaluator*>& evals) { - for (AggFnEvaluator* eval : evals) eval->FreeLocalAllocations(); +vector<ScopedResultsPool> ScopedResultsPool::Create( + const vector<AggFnEvaluator*>& evals, MemPool* new_results_pool) { + vector<ScopedResultsPool> result; + result.reserve(evals.size()); + for (AggFnEvaluator* eval : evals) { + result.emplace_back(eval, new_results_pool); + } + return result; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/agg-fn-evaluator.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h index 2ce968c..e8607a4 100644 --- a/be/src/exprs/agg-fn-evaluator.h +++ b/be/src/exprs/agg-fn-evaluator.h @@ -62,16 +62,26 @@ class AggFnEvaluator { public: /// Creates an AggFnEvaluator object from the aggregate expression 'agg_fn'. /// The evaluator is added to 'pool' and returned in 'eval'. This will also - /// create a single evaluator for each input expression. All allocations will come - /// from 'mem_pool'. Note that it's the responsibility to call Close() all evaluators - /// even if this function returns error status on initialization failure. + /// create a single evaluator for each input expression. + /// + /// Permanent allocations (i.e. those that must live until the evaluator is closed) come + /// from 'expr_perm_pool'. Allocations that may contain expr results come from + /// 'expr_results_pool'. Lifetime of memory in 'expr_results_pool' is managed by the + /// owner of the pool and may freed at any time except when the evaluator is in the + /// middle of evaluating the expression. These pools can be shared between evaluators + /// (so long as the required memory lifetimes are compatible) but cannot be shared + /// between threads since MemPools are not thread-safe. + /// + /// Note that the caller is responsible to call Close() on all evaluators even if this + /// function returns error status on initialization failure. static Status Create(const AggFn& agg_fn, RuntimeState* state, ObjectPool* pool, - MemPool* mem_pool, AggFnEvaluator** eval) WARN_UNUSED_RESULT; + MemPool* expr_perm_pool, MemPool* expr_results_pool, + AggFnEvaluator** eval) WARN_UNUSED_RESULT; /// Convenience functions for creating evaluators for multiple aggregate functions. static Status Create(const std::vector<AggFn*>& agg_fns, RuntimeState* state, - ObjectPool* pool, MemPool* mem_pool, std::vector<AggFnEvaluator*>* evals) - WARN_UNUSED_RESULT; + ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool, + vector<AggFnEvaluator*>* evals) WARN_UNUSED_RESULT; ~AggFnEvaluator(); @@ -87,17 +97,17 @@ class AggFnEvaluator { /// Used by PartitionedAggregation node to initialize one evaluator per partition. /// Avoid the overhead of re-initializing an evaluator (e.g. calling GetConstVal() /// on the input expressions). Cannot be called until after Open() has been called. - /// 'cloned_eval' is a shallow copy of this evaluator: all input values, staging + /// 'cloned_eval' is a shallow copy of this evaluator: all input evaluators, staging /// intermediate values and merge values are shared with the original evaluator. Only /// the FunctionContext 'agg_fn_ctx' is cloned for resource isolation per partition. /// So, it's not safe to use cloned evaluators concurrently. - void ShallowClone( - ObjectPool* pool, MemPool* mem_pool, AggFnEvaluator** cloned_eval) const; + void ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool, + AggFnEvaluator** cloned_eval) const; /// Convenience function for cloning multiple evaluators. The newly cloned evaluators /// are appended to 'cloned_evals'. - static void ShallowClone(ObjectPool* pool, MemPool* mem_pool, - const std::vector<AggFnEvaluator*>& evals, + static void ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool, + MemPool* expr_results_pool, const std::vector<AggFnEvaluator*>& evals, std::vector<AggFnEvaluator*>* cloned_evals); /// Free resources owned by the evaluator. @@ -110,56 +120,70 @@ class AggFnEvaluator { ScalarExprEvaluator* const* IR_ALWAYS_INLINE input_evals() const; - /// Call the initialization function of the AggFn. May update 'dst'. + /// Call the initialization function of the AggFn. May update 'dst'. Any var-len string + /// data referenced from the tuple must be backed by an allocation from + /// FunctionContext::Allocate() (which is ultimately backed by the permanent MemPool). void Init(Tuple* dst); /// Updates the intermediate state dst based on adding the input src row. This can be /// called either to drive the UDA's Update() or Merge() function, depending on whether - /// the AggFn is a merging aggregation. + /// the AggFn is a merging aggregation. Any var-len string data referenced from the + /// tuple must be backed by an allocation from FunctionContext::Allocate() (which is + /// ultimately backed by the permanent MemPool). void Add(const TupleRow* src, Tuple* dst); /// Updates the intermediate state dst to remove the input src row, i.e. undo - /// Add(src, dst). Only used internally for analytic fn builtins. + /// Add(src, dst). Only used internally for analytic fn builtins. Any var-len string + /// data referenced from the tuple must be backed by an expr-managed allocation from + /// FunctionContext::Allocate() (which is ultimately backed by the permanent MemPool). void Remove(const TupleRow* src, Tuple* dst); /// Explicitly does a merge, even if this evaluator is not marked as merging. /// This is used by the partitioned agg node when it needs to merge spill results. - /// In the non-spilling case, this node would normally not merge. + /// In the non-spilling case, this node would normally not merge. Any var-len string + /// data referenced from the tuple must be backed by an expr-managed allocation from + /// FunctionContext::Allocate() (which is ultimately backed by the permanent MemPool). void Merge(Tuple* src, Tuple* dst); /// Flattens any intermediate values containing pointers, and frees any memory - /// allocated during the init, update and merge phases. + /// allocated during the init, update and merge phases. Note that a variable-length + /// string result is backed by the results MemPool, so the caller must be careful not to + /// clear that pool until it is done with the results of expr evaluation. void Serialize(Tuple* dst); /// Does one final transformation of the aggregated value in 'agg_val' and stores the /// result in 'output_val'. Also frees the resources allocated during init, update and - /// merge phases. + /// merge phases. Note that variable-length string results are backed by the results + /// MemPool, so the caller must be careful not to clear that pool until it is done + /// with the results of expr evaluation. void Finalize(Tuple* agg_val, Tuple* output_val); /// Puts the finalized value from Tuple* src in Tuple* dst just as Finalize() does. /// However, unlike Finalize(), GetValue() does not clean up state in src. /// GetValue() can be called repeatedly with the same src. Only used internally for - /// analytic fn builtins. Note that StringVal result is from local allocation (which - /// will be freed in the next QueryMaintenance()) so it needs to be copied out if it - /// needs to survive beyond QueryMaintenance() (e.g. if 'dst' lives in a row batch). + /// analytic fn builtins. Note that variable-length string results are backed by + /// the results MemPool, so the caller must be careful not to clear the results pool + /// until it is done with the results of expr evaluation. void GetValue(Tuple* src, Tuple* dst); /// Helper functions for calling the above functions on many evaluators. static void Init(const std::vector<AggFnEvaluator*>& evals, Tuple* dst); - static void Add(const std::vector<AggFnEvaluator*>& evals, const TupleRow* src, - Tuple* dst); - static void Remove(const std::vector<AggFnEvaluator*>& evals, - const TupleRow* src, Tuple* dst); - static void Serialize(const std::vector<AggFnEvaluator*>& evals, - Tuple* dst); - static void GetValue(const std::vector<AggFnEvaluator*>& evals, Tuple* src, - Tuple* dst); - static void Finalize(const std::vector<AggFnEvaluator*>& evals, Tuple* src, - Tuple* dst); - - /// Free local allocations made in UDA functions and input arguments' evals. - void FreeLocalAllocations(); - static void FreeLocalAllocations(const std::vector<AggFnEvaluator*>& evals); + static void Add( + const std::vector<AggFnEvaluator*>& evals, const TupleRow* src, Tuple* dst); + static void Remove( + const std::vector<AggFnEvaluator*>& evals, const TupleRow* src, Tuple* dst); + static void Serialize(const std::vector<AggFnEvaluator*>& evals, Tuple* dst); + static void GetValue(const std::vector<AggFnEvaluator*>& evals, Tuple* src, Tuple* dst); + static void Finalize(const std::vector<AggFnEvaluator*>& evals, Tuple* src, Tuple* dst); + + /// Replaces the current pool used for the aggregate function's result allocations + /// with 'new_results_pool' and returns the previously-used pool. Useful if the + /// caller wants functions like Serialize(), Finalize() and GetValue() to allocate + /// from a different MemPool. Does does *not* change the pool for the input exprs. + /// This should generally be used via ScopedResultsPool instead of directly. + MemPool* SwapResultsPool(MemPool* new_results_pool) { + return agg_fn_ctx_->impl()->SwapResultsPool(new_results_pool); + } std::string DebugString() const; static std::string DebugString(const std::vector<AggFnEvaluator*>& evals); @@ -178,10 +202,6 @@ class AggFnEvaluator { const AggFn& agg_fn_; - /// Pointer to the MemPool which all allocations come from. - /// Owned by the exec node which owns this evaluator. - MemPool* mem_pool_ = nullptr; - /// This contains runtime state such as constant input arguments to the aggregate /// functions and a FreePool from which the intermediate values are allocated. /// Owned by this evaluator. @@ -202,7 +222,7 @@ class AggFnEvaluator { impala_udf::AnyVal* staging_merge_input_val_ = nullptr; /// Use Create() instead. - AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool is_clone); + AggFnEvaluator(const AggFn& agg_fn, bool is_clone); /// Return the intermediate type of the aggregate function. inline const SlotDescriptor& intermediate_slot_desc() const; @@ -221,10 +241,9 @@ class AggFnEvaluator { /// Sets up the arguments to call 'fn'. This converts from the agg-expr signature, /// taking TupleRow to the UDA signature taking AnyVals. Writes the serialize/finalize /// result to the given destination slot/tuple. 'fn' can be NULL to indicate the src - /// value should simply be written into the destination. Note that StringVal result is - /// from local allocation (which will be freed in the next QueryMaintenance()) so it - /// needs to be copied out if it needs to survive beyond QueryMaintenance() (e.g. if - /// 'dst' lives in a row batch). + /// value should simply be written into the destination. Note that a variable-length + /// string result is backed by the results MemPool, so the caller must be careful not to + /// clear that pool until it is done with the results of expr evaluation. void SerializeOrFinalize(Tuple* src, const SlotDescriptor& dst_slot_desc, Tuple* dst, void* fn); @@ -289,8 +308,24 @@ inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& evals, } } +/// Utility class to swap in a different results pool for the aggregate functions. +/// The previous results pool is restored when this goes out of scope. +class ScopedResultsPool { + public: + ScopedResultsPool(AggFnEvaluator* agg_fn_eval, MemPool* new_results_pool) + : agg_fn_eval_(agg_fn_eval), + prev_results_pool_(agg_fn_eval->SwapResultsPool(new_results_pool)) {} + + ~ScopedResultsPool() { agg_fn_eval_->SwapResultsPool(prev_results_pool_); } + /// Helper to swap in the same pool to many evaluators. + static std::vector<ScopedResultsPool> Create( + const std::vector<AggFnEvaluator*>& evals, MemPool* new_results_pool); + private: + AggFnEvaluator* const agg_fn_eval_; + MemPool* const prev_results_pool_; +}; } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/case-expr.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/case-expr.cc b/be/src/exprs/case-expr.cc index 322c975..5a3d1fb 100644 --- a/be/src/exprs/case-expr.cc +++ b/be/src/exprs/case-expr.cc @@ -59,11 +59,11 @@ Status CaseExpr::OpenEvaluator(FunctionContext::FunctionStateScope scope, fn_ctx->SetFunctionState(FunctionContext::THREAD_LOCAL, case_state); const ColumnType& case_val_type = has_case_expr_ ? GetChild(0)->type() : TYPE_BOOLEAN; - RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), case_val_type, + RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), case_val_type, "Could not allocate expression value", &case_state->case_val)); const ColumnType& when_val_type = has_case_expr_ ? GetChild(1)->type() : GetChild(0)->type(); - RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), when_val_type, + RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), when_val_type, "Could not allocate expression value", &case_state->when_val)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/cast-functions-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/cast-functions-ir.cc b/be/src/exprs/cast-functions-ir.cc index 518dd5a..26bbe9b 100644 --- a/be/src/exprs/cast-functions-ir.cc +++ b/be/src/exprs/cast-functions-ir.cc @@ -185,7 +185,7 @@ StringVal CastFunctions::CastToChar(FunctionContext* ctx, const StringVal& val) DCHECK_GE(type.len, 1); char* cptr; if (type.len > val.len) { - cptr = reinterpret_cast<char*>(ctx->impl()->AllocateLocal(type.len)); + cptr = reinterpret_cast<char*>(ctx->impl()->AllocateForResults(type.len)); if (UNLIKELY(cptr == NULL)) { DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); return StringVal::null(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/expr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index bf7808d..a460f5f 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -1234,7 +1234,8 @@ void ExprTest::TestSingleLiteralConstruction( Literal* expr = CreateLiteral(type, string_val); ScalarExprEvaluator* eval; - EXPECT_OK(ScalarExprEvaluator::Create(*expr, &state, &pool, &mem_pool, &eval)); + EXPECT_OK( + ScalarExprEvaluator::Create(*expr, &state, &pool, &mem_pool, &mem_pool, &eval)); EXPECT_OK(eval->Open(&state)); EXPECT_EQ(0, RawValue::Compare(eval->GetValue(nullptr), &value, type)) << "type: " << type << ", value: " << value; @@ -1252,7 +1253,8 @@ TEST_F(ExprTest, NullLiteral) { NullLiteral expr(static_cast<PrimitiveType>(type)); ScalarExprEvaluator* eval; - EXPECT_OK(ScalarExprEvaluator::Create(expr, &state, &pool, &mem_pool, &eval)); + EXPECT_OK( + ScalarExprEvaluator::Create(expr, &state, &pool, &mem_pool, &mem_pool, &eval)); EXPECT_OK(eval->Open(&state)); EXPECT_TRUE(eval->GetValue(nullptr) == nullptr); eval->Close(&state); @@ -3293,12 +3295,18 @@ TEST_F(ExprTest, StringFunctions) { StringVal bam(static_cast<uint8_t*>(short_buf->data()), StringVal::MAX_LENGTH); auto r4 = StringFunctions::Replace(context, bam, z, aaa); EXPECT_TRUE(r4.is_null); + // Re-create context to clear the error from failed allocation. + UdfTestHarness::CloseContext(context); + context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool); // Similar test for second overflow. This tests overflowing on re-allocation. (*short_buf)[4095] = 'Z'; StringVal bam2(static_cast<uint8_t*>(short_buf->data()), StringVal::MAX_LENGTH-2); auto r5 = StringFunctions::Replace(context, bam2, z, aaa); EXPECT_TRUE(r5.is_null); + // Re-create context to clear the error from failed allocation. + UdfTestHarness::CloseContext(context); + context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool); // Finally, test expanding to exactly MAX_LENGTH // There are 4 Zs in giga4 (not including the trailing one, as we truncate that) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/hive-udf-call.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc index a3e0fe6..19e2e63 100644 --- a/be/src/exprs/hive-udf-call.cc +++ b/be/src/exprs/hive-udf-call.cc @@ -229,7 +229,7 @@ Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope, RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); - RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), type_, + RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), type_, "Could not allocate JNI output value", &jni_ctx->output_anyval)); return Status::OK(); } @@ -327,7 +327,7 @@ StringVal HiveUdfCall::GetStringVal( DCHECK_EQ(type_.type, TYPE_STRING); StringVal result = *reinterpret_cast<StringVal*>(Evaluate(eval, row)); if (result.is_null) return StringVal::null(); - // Copy the string into a local allocation with the usual lifetime for expr results. + // Copy the string into a result allocation with the usual lifetime for expr results. // Needed because the UDF output buffer is owned by the Java UDF executor and may be // freed or reused by the next call into the Java UDF executor. FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/scalar-expr-evaluator.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/scalar-expr-evaluator.cc b/be/src/exprs/scalar-expr-evaluator.cc index a327dd4..b1ace96 100644 --- a/be/src/exprs/scalar-expr-evaluator.cc +++ b/be/src/exprs/scalar-expr-evaluator.cc @@ -62,21 +62,21 @@ using namespace impala_udf; const char* ScalarExprEvaluator::LLVM_CLASS_NAME = "class.impala::ScalarExprEvaluator"; -ScalarExprEvaluator::ScalarExprEvaluator(const ScalarExpr& root, MemPool* mem_pool) - : mem_pool_(mem_pool), - root_(root) { -} +ScalarExprEvaluator::ScalarExprEvaluator( + const ScalarExpr& root, MemPool* expr_perm_pool, MemPool* expr_results_pool) + : expr_perm_pool_(expr_perm_pool), root_(root) {} ScalarExprEvaluator::~ScalarExprEvaluator() { DCHECK(!initialized_ || closed_); } Status ScalarExprEvaluator::Create(const ScalarExpr& root, RuntimeState* state, - ObjectPool* pool, MemPool* mem_pool, ScalarExprEvaluator** eval) { - *eval = pool->Add(new ScalarExprEvaluator(root, mem_pool)); + ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool, + ScalarExprEvaluator** eval) { + *eval = pool->Add(new ScalarExprEvaluator(root, expr_perm_pool, expr_results_pool)); if (root.fn_ctx_idx_end_ > 0) { (*eval)->fn_ctxs_.resize(root.fn_ctx_idx_end_, nullptr); - (*eval)->CreateFnCtxs(state, root); + (*eval)->CreateFnCtxs(state, root, expr_perm_pool, expr_results_pool); DCHECK_EQ((*eval)->fn_ctxs_.size(), root.fn_ctx_idx_end_); for (FunctionContext* fn_ctx : (*eval)->fn_ctxs_) DCHECK(fn_ctx != nullptr); (*eval)->fn_ctxs_ptr_ = (*eval)->fn_ctxs_.data(); @@ -91,10 +91,11 @@ Status ScalarExprEvaluator::Create(const ScalarExpr& root, RuntimeState* state, } Status ScalarExprEvaluator::Create(const vector<ScalarExpr*>& exprs, RuntimeState* state, - ObjectPool* pool, MemPool* mem_pool, vector<ScalarExprEvaluator*>* evals) { + ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool, + vector<ScalarExprEvaluator*>* evals) { for (const ScalarExpr* expr : exprs) { ScalarExprEvaluator* eval; - Status status = Create(*expr, state, pool, mem_pool, &eval); + Status status = Create(*expr, state, pool, expr_perm_pool, expr_results_pool, &eval); // Always add the evaluator to the vector so it can be cleaned up. evals->push_back(eval); RETURN_IF_ERROR(status); @@ -102,12 +103,13 @@ Status ScalarExprEvaluator::Create(const vector<ScalarExpr*>& exprs, RuntimeStat return Status::OK(); } -void ScalarExprEvaluator::CreateFnCtxs(RuntimeState* state, const ScalarExpr& expr) { +void ScalarExprEvaluator::CreateFnCtxs(RuntimeState* state, const ScalarExpr& expr, + MemPool* expr_perm_pool, MemPool* expr_results_pool) { const int fn_ctx_idx = expr.fn_ctx_idx(); const bool has_fn_ctx = fn_ctx_idx != -1; vector<FunctionContext::TypeDesc> arg_types; for (const ScalarExpr* child : expr.children()) { - CreateFnCtxs(state, *child); + CreateFnCtxs(state, *child, expr_perm_pool, expr_results_pool); if (has_fn_ctx) arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(child->type())); } if (has_fn_ctx) { @@ -117,8 +119,8 @@ void ScalarExprEvaluator::CreateFnCtxs(RuntimeState* state, const ScalarExpr& ex DCHECK_GE(fn_ctx_idx, 0); DCHECK_LT(fn_ctx_idx, fn_ctxs_.size()); DCHECK(fn_ctxs_[fn_ctx_idx] == nullptr); - fn_ctxs_[fn_ctx_idx] = FunctionContextImpl::CreateContext( - state, mem_pool_, return_type, arg_types, varargs_buffer_size); + fn_ctxs_[fn_ctx_idx] = FunctionContextImpl::CreateContext(state, expr_perm_pool, + expr_results_pool, return_type, arg_types, varargs_buffer_size); } } @@ -151,8 +153,8 @@ void ScalarExprEvaluator::Close(RuntimeState* state) { delete fn_ctxs_[i]; } fn_ctxs_.clear(); - // Memory allocated by 'fn_ctx_' is still in 'mem_pool_'. It's the responsibility of - // the owner of 'mem_pool_' to free it. + // Memory allocated by 'fn_ctx_' is still in the MemPools. It's the responsibility of + // the owners of those pools to free it. closed_ = true; } @@ -162,12 +164,15 @@ void ScalarExprEvaluator::Close( } Status ScalarExprEvaluator::Clone(ObjectPool* pool, RuntimeState* state, - MemPool* mem_pool, ScalarExprEvaluator** cloned_eval) const { + MemPool* expr_perm_pool, MemPool* expr_results_pool, + ScalarExprEvaluator** cloned_eval) const { DCHECK(initialized_); DCHECK(opened_); - *cloned_eval = pool->Add(new ScalarExprEvaluator(root_, mem_pool)); + *cloned_eval = pool->Add( + new ScalarExprEvaluator(root_, expr_perm_pool, expr_results_pool)); for (int i = 0; i < fn_ctxs_.size(); ++i) { - (*cloned_eval)->fn_ctxs_.push_back(fn_ctxs_[i]->impl()->Clone(mem_pool)); + (*cloned_eval)->fn_ctxs_.push_back( + fn_ctxs_[i]->impl()->Clone(expr_perm_pool, expr_results_pool)); } (*cloned_eval)->fn_ctxs_ptr_ = (*cloned_eval)->fn_ctxs_.data(); (*cloned_eval)->is_clone_ = true; @@ -178,46 +183,20 @@ Status ScalarExprEvaluator::Clone(ObjectPool* pool, RuntimeState* state, } Status ScalarExprEvaluator::Clone(ObjectPool* pool, RuntimeState* state, - MemPool* mem_pool, const vector<ScalarExprEvaluator*>& evals, + MemPool* expr_perm_pool, MemPool* expr_results_pool, + const vector<ScalarExprEvaluator*>& evals, vector<ScalarExprEvaluator*>* cloned_evals) { DCHECK(cloned_evals != nullptr); DCHECK(cloned_evals->empty()); for (int i = 0; i < evals.size(); ++i) { ScalarExprEvaluator* cloned_eval; - RETURN_IF_ERROR(evals[i]->Clone(pool, state, mem_pool, &cloned_eval)); + RETURN_IF_ERROR( + evals[i]->Clone(pool, state, expr_perm_pool, expr_results_pool, &cloned_eval)); cloned_evals->push_back(cloned_eval); } return Status::OK(); } -bool ScalarExprEvaluator::HasLocalAllocations() const { - for (int i = 0; i < fn_ctxs_.size(); ++i) { - if (fn_ctxs_[i]->impl()->closed()) continue; - if (fn_ctxs_[i]->impl()->HasLocalAllocations()) return true; - } - return false; -} - -bool ScalarExprEvaluator::HasLocalAllocations( - const vector<ScalarExprEvaluator*>& evals) { - for (int i = 0; i < evals.size(); ++i) { - if (evals[i]->HasLocalAllocations()) return true; - } - return false; -} - -void ScalarExprEvaluator::FreeLocalAllocations() { - for (int i = 0; i < fn_ctxs_.size(); ++i) { - if (fn_ctxs_[i]->impl()->closed()) continue; - fn_ctxs_[i]->impl()->FreeLocalAllocations(); - } -} - -void ScalarExprEvaluator::FreeLocalAllocations( - const vector<ScalarExprEvaluator*>& evals) { - for (int i = 0; i < evals.size(); ++i) evals[i]->FreeLocalAllocations(); -} - Status ScalarExprEvaluator::GetError(int start_idx, int end_idx) const { DCHECK(opened_); end_idx = end_idx == -1 ? fn_ctxs_.size() : end_idx; @@ -241,9 +220,9 @@ Status ScalarExprEvaluator::GetConstValue(RuntimeState* state, const ScalarExpr& // A constant expression shouldn't have any SlotRefs expr in it. DCHECK_EQ(expr.GetSlotIds(), 0); - DCHECK(mem_pool_ != nullptr); + DCHECK(expr_perm_pool_ != nullptr); const ColumnType& result_type = expr.type(); - RETURN_IF_ERROR(AllocateAnyVal(state, mem_pool_, result_type, + RETURN_IF_ERROR(AllocateAnyVal(state, expr_perm_pool_, result_type, "Could not allocate constant expression value", const_val)); void* result = ScalarExprEvaluator::GetValue(expr, nullptr); @@ -252,9 +231,10 @@ Status ScalarExprEvaluator::GetConstValue(RuntimeState* state, const ScalarExpr& StringVal* sv = reinterpret_cast<StringVal*>(*const_val); if (!sv->is_null && sv->len > 0) { // Make sure the memory is owned by this evaluator. - char* ptr_copy = reinterpret_cast<char*>(mem_pool_->TryAllocateUnaligned(sv->len)); + char* ptr_copy = + reinterpret_cast<char*>(expr_perm_pool_->TryAllocateUnaligned(sv->len)); if (ptr_copy == nullptr) { - return mem_pool_->mem_tracker()->MemLimitExceeded( + return expr_perm_pool_->mem_tracker()->MemLimitExceeded( state, "Could not allocate constant string value", sv->len); } memcpy(ptr_copy, sv->ptr, sv->len); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/scalar-expr-evaluator.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/scalar-expr-evaluator.h b/be/src/exprs/scalar-expr-evaluator.h index 5792d14..9a27951 100644 --- a/be/src/exprs/scalar-expr-evaluator.h +++ b/be/src/exprs/scalar-expr-evaluator.h @@ -71,19 +71,28 @@ class ScalarExprEvaluator { ~ScalarExprEvaluator(); /// Creates an evaluator for the scalar expression tree rooted at 'expr' and all - /// FunctionContexts needed during evaluation. Allocations from this evaluator will - /// be from 'mem_pool'. The newly created evaluator will be stored in 'pool' and - /// returned in 'eval'. Returns error status on failure. Note that it's the - /// responsibility to call Close() on all created evaluators even if this function - /// returns error on initialization failure. + /// FunctionContexts needed during evaluation. + /// + /// Permanent allocations (i.e. those that must live until the evaluator is closed) come + /// from 'expr_perm_pool'. Allocations that may contain expr results (i.e. the + /// results of GetValue(), GetStringVal(), etc) come from 'expr_results_pool'. Lifetime + /// of memory in 'expr_results_pool' is managed by the owner of the pool and may freed + /// by the owner at any time except when the evaluator is in the middle of evaluating + /// the expression. These pools can be shared between evaluators (so long as the + /// required memory lifetimes are compatible) but cannot be shared between threads + /// since MemPools are not thread-safe. + /// + /// Note that the caller is responsible to call Close() on all evaluators even if this + /// function returns error status on initialization failure. static Status Create(const ScalarExpr& expr, RuntimeState* state, ObjectPool* pool, - MemPool* mem_pool, ScalarExprEvaluator** eval) WARN_UNUSED_RESULT; + MemPool* expr_perm_pool, MemPool* expr_results_pool, + ScalarExprEvaluator** eval) WARN_UNUSED_RESULT; /// Convenience function for creating multiple ScalarExprEvaluators. The evaluators /// are returned in 'evals'. static Status Create(const std::vector<ScalarExpr*>& exprs, RuntimeState* state, - ObjectPool* pool, MemPool* mem_pool, std::vector<ScalarExprEvaluator*>* evals) - WARN_UNUSED_RESULT; + ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool, + std::vector<ScalarExprEvaluator*>* evals) WARN_UNUSED_RESULT; /// Initializes the ScalarExprEvaluator on all nodes in the ScalarExpr tree. This is /// also the location in which constant arguments to functions are computed. Does not @@ -105,21 +114,22 @@ class ScalarExprEvaluator { /// Creates a copy of this ScalarExprEvaluator. Open() must be called first. The copy /// contains clones of each FunctionContext, which share the fragment-local state of the - /// original one but have their own FreePool and thread-local state. This should be used + /// original one but have their own memory and thread-local state. This should be used /// to create an ScalarExprEvaluator for each execution thread that needs to evaluate - /// 'root_'. All allocations will be from 'mem_pool' so callers should use different - /// MemPool for evaluators in different threads. Note that clones are considered opened. - /// The cloned ScalarExprEvaluator cannot be used after the original ScalarExprEvaluator - /// is destroyed because it may reference fragment-local state from the original. + /// 'root_'. 'expr_perm_pool' and 'expr_results_pool' are used for allocations so callers + /// must use different MemPools for evaluators in different threads. Note that clones + /// are considered opened. The cloned ScalarExprEvaluator cannot be used after the + /// original ScalarExprEvaluator is destroyed because it may reference fragment-local + /// state from the original. /// TODO: IMPALA-4743: Evaluate input arguments in ScalarExpr::Init() and store them /// in ScalarExpr. - Status Clone(ObjectPool* pool, RuntimeState* state, MemPool* mem_pool, - ScalarExprEvaluator** new_eval) const WARN_UNUSED_RESULT; + Status Clone(ObjectPool* pool, RuntimeState* state, MemPool* expr_perm_pool, + MemPool* expr_results_pool, ScalarExprEvaluator** new_eval) const WARN_UNUSED_RESULT; /// Convenience functions for cloning multiple ScalarExprEvaluators. The newly /// created evaluators are appended to 'new_evals. - static Status Clone(ObjectPool* pool, RuntimeState* state, MemPool* mem_pool, - const std::vector<ScalarExprEvaluator*>& evals, + static Status Clone(ObjectPool* pool, RuntimeState* state, MemPool* expr_perm_pool, + MemPool* expr_results_pool, const std::vector<ScalarExprEvaluator*>& evals, std::vector<ScalarExprEvaluator*>* new_evals) WARN_UNUSED_RESULT; /// If 'expr' is constant, evaluates it with no input row argument and returns the @@ -166,15 +176,6 @@ class ScalarExprEvaluator { void PrintValue(void* value, std::string* str); void PrintValue(void* value, std::stringstream* stream); - /// Returns true if any of the expression contexts in the array has local allocations. - static bool HasLocalAllocations(const std::vector<ScalarExprEvaluator*>& evals); - bool HasLocalAllocations() const; - - /// Frees all local allocations made by fn_ctxs_. This can be called when result - /// data from this context is no longer needed. - void FreeLocalAllocations(); - static void FreeLocalAllocations(const std::vector<ScalarExprEvaluator*>& evals); - /// Get the number of digits after the decimal that should be displayed for this value. /// Returns -1 if no scale has been specified (currently the scale is only set for /// doubles set by RoundUpTo). GetValue() must have already been called. @@ -184,7 +185,7 @@ class ScalarExprEvaluator { bool opened() const { return opened_; } bool closed() const { return closed_; } bool is_clone() const { return is_clone_; } - MemPool* mem_pool() const { return mem_pool_; } + MemPool* expr_perm_pool() const { return expr_perm_pool_; } /// The builtin functions are not called from anywhere in the code and the /// symbols are therefore not included in the binary. We call these functions @@ -220,9 +221,10 @@ class ScalarExprEvaluator { /// to access the correct FunctionContext. FunctionContext** fn_ctxs_ptr_ = nullptr; - /// Pointer to the MemPool which all allocations (including fn_ctxs_') come from. - /// Owned by the exec node which owns this evaluator. - MemPool* mem_pool_; + /// Pointer to the MemPool which all permanent allocations (including those from + /// 'fn_ctxs_') come from. Owned by the exec node or data sink which owns this + /// evaluator. + MemPool* const expr_perm_pool_; /// The expr tree which this evaluator is for. const ScalarExpr& root_; @@ -246,13 +248,14 @@ class ScalarExprEvaluator { /// TODO: move this to Expr initialization after IMPALA-4743 is fixed. int output_scale_ = -1; - ScalarExprEvaluator(const ScalarExpr& root, MemPool* mem_pool); + ScalarExprEvaluator(const ScalarExpr& root, MemPool* expr_perm_pool, + MemPool* expr_results_pool); /// Walks the expression tree 'expr' and fills in 'fn_ctxs_' for all Expr nodes /// which need FunctionContext. - void CreateFnCtxs(RuntimeState* state, const ScalarExpr& expr); + void CreateFnCtxs(RuntimeState* state, const ScalarExpr& expr, MemPool* expr_perm_pool, + MemPool* expr_results_pool); }; - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/scalar-fn-call.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc index 73b497d..82b886c 100644 --- a/be/src/exprs/scalar-fn-call.cc +++ b/be/src/exprs/scalar-fn-call.cc @@ -170,7 +170,7 @@ Status ScalarFnCall::OpenEvaluator(FunctionContext::FunctionStateScope scope, vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals(); for (int i = 0; i < NumFixedArgs(); ++i) { AnyVal* input_val; - RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), children_[i]->type(), + RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), children_[i]->type(), "Could not allocate expression value", &input_val)); input_vals->push_back(input_val); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/string-functions-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/string-functions-ir.cc b/be/src/exprs/string-functions-ir.cc index d0c8679..49bc8c1 100644 --- a/be/src/exprs/string-functions-ir.cc +++ b/be/src/exprs/string-functions-ir.cc @@ -294,7 +294,7 @@ StringVal StringFunctions::Replace(FunctionContext* context, const StringVal& st } StringVal result(context, buffer_space); - // If the result went over MAX_LENGTH, we can get a null result back + // result may be NULL if we went over MAX_LENGTH or the allocation failed. if (UNLIKELY(result.is_null)) return result; uint8_t* ptr = result.ptr; @@ -333,23 +333,22 @@ StringVal StringFunctions::Replace(FunctionContext* context, const StringVal& st // Also no overflow: min_output <= MAX_LENGTH and delta <= MAX_LENGTH - 1 const int64_t space_needed = min_output + delta; if (UNLIKELY(space_needed > buffer_space)) { - // Double at smaller sizes, but don't grow more than a megabyte a - // time at larger sizes. Reasoning: let the allocator do its job - // and don't depend on policy here. - static_assert(StringVal::MAX_LENGTH % (1 << 20) == 0, - "Math requires StringVal::MAX_LENGTH to be a multiple of 1MB"); - // Must compute next power of two using 64-bit math to avoid signed overflow - // The following DCHECK was supposed to be a static assertion, but C++11 is - // broken and doesn't declare std::min or std::max to be constexpr. Fix this - // when eventually the minimum supported standard is raised to at least C++14 - DCHECK_EQ(static_cast<int>(std::min<int64_t>( - BitUtil::RoundUpToPowerOfTwo(StringVal::MAX_LENGTH+1), - StringVal::MAX_LENGTH + (1 << 20))), - StringVal::MAX_LENGTH + (1 << 20)); - buffer_space = static_cast<int>(std::min<int64_t>( - BitUtil::RoundUpToPowerOfTwo(space_needed), - space_needed + (1 << 20))); - if (UNLIKELY(!result.Resize(context, buffer_space))) return StringVal::null(); + // Check to see if we can allocate a large enough buffer. + if (space_needed > StringVal::MAX_LENGTH) { + context->SetError( + "String length larger than allowed limit of 1 GB character data."); + return StringVal::null(); + } + // Double the buffer size whenever it fills up to amortise cost of resizing. + // Must compute next power of two using 64-bit math to avoid signed overflow. + buffer_space = min<int>(StringVal::MAX_LENGTH, + static_cast<int>(BitUtil::RoundUpToPowerOfTwo(space_needed))); + + // Give up if the allocation fails or we hit an error. This prevents us from + // continuing to blow past the mem limit. + if (UNLIKELY(!result.Resize(context, buffer_space) || context->has_error())) { + return StringVal::null(); + } // Don't forget to move the pointer ptr = result.ptr + bytes_produced; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index ad9f074..9c655ec 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -38,7 +38,6 @@ add_library(Runtime disk-io-mgr-scan-range.cc disk-io-mgr-stress.cc exec-env.cc - free-pool.cc fragment-instance-state.cc hbase-table.cc hbase-table-factory.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/data-stream-sender.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc index f39a595..dc26a23 100644 --- a/be/src/runtime/data-stream-sender.cc +++ b/be/src/runtime/data-stream-sender.cc @@ -390,7 +390,8 @@ Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* parent_mem_tra state_ = state; SCOPED_TIMER(profile_->total_time_counter()); RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state, - state->obj_pool(), expr_mem_pool(), &partition_expr_evals_)); + state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(), + &partition_expr_evals_)); bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); uncompressed_bytes_counter_ = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); @@ -479,7 +480,7 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) { } } COUNTER_ADD(total_sent_rows_counter_, batch->num_rows()); - ScalarExprEvaluator::FreeLocalAllocations(partition_expr_evals_); + expr_results_pool_->Clear(); RETURN_IF_ERROR(state->CheckQueryState()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index 8e85894..79a97b6 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -307,7 +307,8 @@ class DataStreamTest : public testing::Test { ordering_exprs_.push_back(lhs_slot); less_than_ = obj_pool_.Add(new TupleRowComparator(ordering_exprs_, is_asc_, nulls_first_)); - ASSERT_OK(less_than_->Open(&obj_pool_, runtime_state_.get(), mem_pool_.get())); + ASSERT_OK(less_than_->Open( + &obj_pool_, runtime_state_.get(), mem_pool_.get(), mem_pool_.get())); } // Create batch_, but don't fill it with data yet. Assumes we created row_desc_. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/descriptors.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index 0f97f76..ef553fb 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -486,7 +486,7 @@ Status DescriptorTbl::CreatePartKeyExprs( // TODO: RowDescriptor should arguably be optional in Prepare for known literals. // Partition exprs are not used in the codegen case. Don't codegen them. RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_value_exprs, nullptr, - pool, nullptr, &part_desc->partition_key_value_evals_)); + pool, nullptr, nullptr, &part_desc->partition_key_value_evals_)); RETURN_IF_ERROR(ScalarExprEvaluator::Open( part_desc->partition_key_value_evals_, nullptr)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/free-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/free-pool.cc b/be/src/runtime/free-pool.cc deleted file mode 100644 index df0569d..0000000 --- a/be/src/runtime/free-pool.cc +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/free-pool.h" - -#include "common/names.h" - -using namespace impala; - -#ifndef NDEBUG - -AtomicInt32 FreePool::alloc_counts_(0); - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/free-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/free-pool.h b/be/src/runtime/free-pool.h index f4a3e4b..52c7137 100644 --- a/be/src/runtime/free-pool.h +++ b/be/src/runtime/free-pool.h @@ -25,13 +25,11 @@ #include <sstream> #include <unordered_map> -#include "common/atomic.h" #include "common/logging.h" #include "gutil/dynamic_annotations.h" #include "runtime/mem-pool.h" #include "util/bit-util.h" -DECLARE_int32(stress_free_pool_alloc); DECLARE_bool(disable_mem_pools); namespace impala { @@ -62,12 +60,6 @@ class FreePool { /// Allocates a buffer of size between [0, 2^62 - 1 - sizeof(FreeListNode)] bytes. uint8_t* Allocate(const int64_t requested_size) { DCHECK_GE(requested_size, 0); -#ifndef NDEBUG - if (FLAGS_stress_free_pool_alloc > 0 && - (alloc_counts_.Add(1) % FLAGS_stress_free_pool_alloc) == 0) { - return nullptr; - } -#endif /// Return a non-nullptr dummy pointer. nullptr is reserved for failures. if (UNLIKELY(requested_size == 0)) return mem_pool_->EmptyAllocPtr(); ++net_allocations_; @@ -140,12 +132,6 @@ class FreePool { /// nullptr will be returned on allocation failure. It's the caller's responsibility to /// free the memory buffer pointed to by "ptr" in this case. uint8_t* Reallocate(uint8_t* ptr, int64_t size) { -#ifndef NDEBUG - if (FLAGS_stress_free_pool_alloc > 0 && - (alloc_counts_.Add(1) % FLAGS_stress_free_pool_alloc) == 0) { - return nullptr; - } -#endif if (UNLIKELY(ptr == nullptr || ptr == mem_pool_->EmptyAllocPtr())) return Allocate(size); if (FLAGS_disable_mem_pools) { return reinterpret_cast<uint8_t*>(realloc(reinterpret_cast<void*>(ptr), size)); @@ -258,12 +244,6 @@ class FreePool { /// Diagnostic counter that tracks (# Allocates - # Frees) int64_t net_allocations_; - -#ifndef NDEBUG - /// Counter for tracking the number of allocations. Used only if the - /// the stress flag FLAGS_stress_free_pool_alloc is set. - static AtomicInt32 alloc_counts_; -#endif }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/sorted-run-merger.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc index 673d6c3..8f5ede9 100644 --- a/be/src/runtime/sorted-run-merger.cc +++ b/be/src/runtime/sorted-run-merger.cc @@ -172,9 +172,6 @@ Status SortedRunMerger::GetNext(RowBatch* output_batch, bool* eos) { output_batch->CommitLastRow(); RETURN_IF_ERROR(AdvanceMinRow(output_batch)); } - // Free local allocations made by comparator_.Less(); - comparator_.FreeLocalAllocations(); - *eos = min_heap_.empty(); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index 16984ca..b85e61b 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -541,8 +541,8 @@ class Sorter::TupleIterator { /// instance to check for cancellation during an in-memory sort. class Sorter::TupleSorter { public: - TupleSorter(const TupleRowComparator& comparator, int64_t page_size, int tuple_size, - RuntimeState* state); + TupleSorter(Sorter* parent, const TupleRowComparator& comparator, int64_t page_size, + int tuple_size, RuntimeState* state); ~TupleSorter(); @@ -555,6 +555,8 @@ class Sorter::TupleSorter { private: static const int INSERTION_THRESHOLD = 16; + Sorter* const parent_; + /// Size of the tuples in memory. const int tuple_size_; @@ -562,7 +564,7 @@ class Sorter::TupleSorter { const TupleRowComparator& comparator_; /// Number of times comparator_.Less() can be invoked again before - /// comparator_.FreeLocalAllocations() needs to be called. + /// comparator_. expr_results_pool_.Clear() needs to be called. int num_comparisons_till_free_; /// Runtime state instance to check for cancellation. Not owned. @@ -581,7 +583,7 @@ class Sorter::TupleSorter { /// high: Mersenne Twister should be more than adequate. mt19937_64 rng_; - /// Wrapper around comparator_.Less(). Also call comparator_.FreeLocalAllocations() + /// Wrapper around comparator_.Less(). Also call expr_results_pool_.Clear() /// on every 'state_->batch_size()' invocations of comparator_.Less(). Returns true /// if 'lhs' is less than 'rhs'. bool Less(const TupleRow* lhs, const TupleRow* rhs); @@ -1251,9 +1253,10 @@ void Sorter::TupleIterator::PrevPage(Sorter::Run* run, int tuple_size) { tuple_ = run->fixed_len_pages_[page_index_].data() + last_tuple_page_offset; } -Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t page_size, - int tuple_size, RuntimeState* state) - : tuple_size_(tuple_size), +Sorter::TupleSorter::TupleSorter(Sorter* parent, const TupleRowComparator& comp, + int64_t page_size, int tuple_size, RuntimeState* state) + : parent_(parent), + tuple_size_(tuple_size), comparator_(comp), num_comparisons_till_free_(state->batch_size()), state_(state) { @@ -1270,7 +1273,7 @@ bool Sorter::TupleSorter::Less(const TupleRow* lhs, const TupleRow* rhs) { --num_comparisons_till_free_; DCHECK_GE(num_comparisons_till_free_, 0); if (UNLIKELY(num_comparisons_till_free_ == 0)) { - comparator_.FreeLocalAllocations(); + parent_->expr_results_pool_.Clear(); num_comparisons_till_free_ = state_->batch_size(); } return comparator_.Less(lhs, rhs); @@ -1463,14 +1466,17 @@ inline void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple* swap_tup memcpy(right, swap_tuple, tuple_size); } -Sorter::Sorter(const TupleRowComparator& compare_less_than, +Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs, + const std::vector<bool>& is_asc_order, const std::vector<bool>& nulls_first, const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc, MemTracker* mem_tracker, BufferPool::ClientHandle* buffer_pool_client, int64_t page_len, RuntimeProfile* profile, RuntimeState* state, int node_id, bool enable_spilling) : node_id_(node_id), state_(state), - compare_less_than_(compare_less_than), + expr_perm_pool_(mem_tracker), + expr_results_pool_(mem_tracker), + compare_less_than_(ordering_exprs, is_asc_order, nulls_first), in_mem_tuple_sorter_(NULL), buffer_pool_client_(buffer_pool_client), page_len_(page_len), @@ -1495,7 +1501,7 @@ Sorter::~Sorter() { DCHECK(merge_output_run_ == NULL); } -Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) { +Status Sorter::Prepare(ObjectPool* obj_pool) { DCHECK(in_mem_tuple_sorter_ == NULL) << "Already prepared"; // Page byte offsets are packed into uint32_t values, which limits the supported // page size. @@ -1513,7 +1519,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) { PrettyPrinter::Print(state_->query_options().max_row_size, TUnit::BYTES)); } has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots(); - in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_, page_len_, + in_mem_tuple_sorter_.reset(new TupleSorter(this, compare_less_than_, page_len_, sort_tuple_desc->byte_size(), state_)); if (enable_spilling_) { @@ -1528,25 +1534,26 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) { run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", TUnit::UNIT); RETURN_IF_ERROR(ScalarExprEvaluator::Create(sort_tuple_exprs_, state_, obj_pool, - expr_mem_pool, &sort_tuple_expr_evals_)); + &expr_perm_pool_, &expr_results_pool_, &sort_tuple_expr_evals_)); return Status::OK(); } +Status Sorter::Codegen(RuntimeState* state) { + return compare_less_than_.Codegen(state); +} + Status Sorter::Open() { DCHECK(in_mem_tuple_sorter_ != NULL) << "Not prepared"; DCHECK(unsorted_run_ == NULL) << "Already open"; + RETURN_IF_ERROR(compare_less_than_.Open(&obj_pool_, state_, &expr_perm_pool_, + &expr_results_pool_)); TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0]; - unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true)); + unsorted_run_ = run_pool_.Add(new Run(this, sort_tuple_desc, true)); RETURN_IF_ERROR(unsorted_run_->Init()); RETURN_IF_ERROR(ScalarExprEvaluator::Open(sort_tuple_expr_evals_, state_)); return Status::OK(); } -void Sorter::FreeLocalAllocations() { - compare_less_than_.FreeLocalAllocations(); - ScalarExprEvaluator::FreeLocalAllocations(sort_tuple_expr_evals_); -} - int64_t Sorter::ComputeMinReservation() { // Must be kept in sync with SortNode.computeNodeResourceProfile() in fe. int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1; @@ -1572,16 +1579,20 @@ Status Sorter::AddBatch(RowBatch* batch) { RETURN_IF_ERROR(SortCurrentInputRun()); RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages()); unsorted_run_ = - obj_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], true)); + run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], true)); RETURN_IF_ERROR(unsorted_run_->Init()); } } + // Clear any temporary allocations made while materializing the sort tuples. + expr_results_pool_.Clear(); return Status::OK(); } Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* num_processed) { DCHECK(batch != nullptr); RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, num_processed)); + // Clear any temporary allocations made while materializing the sort tuples. + expr_results_pool_.Clear(); return Status::OK(); } @@ -1617,7 +1628,10 @@ Status Sorter::GetNext(RowBatch* output_batch, bool* eos) { DCHECK(sorted_runs_.back()->is_pinned()); return sorted_runs_.back()->GetNext<false>(output_batch, eos); } else { - return merger_->GetNext(output_batch, eos); + RETURN_IF_ERROR(merger_->GetNext(output_batch, eos)); + // Clear any temporary allocations made by the merger. + expr_results_pool_.Clear(); + return Status::OK(); } } @@ -1626,13 +1640,16 @@ void Sorter::Reset() { merger_.reset(); // Free resources from the current runs. CleanupAllRuns(); - obj_pool_.Clear(); + compare_less_than_.Close(state_); } void Sorter::Close(RuntimeState* state) { CleanupAllRuns(); - obj_pool_.Clear(); + compare_less_than_.Close(state); ScalarExprEvaluator::Close(sort_tuple_expr_evals_, state); + expr_perm_pool_.FreeAll(); + expr_results_pool_.FreeAll(); + obj_pool_.Clear(); } void Sorter::CleanupAllRuns() { @@ -1642,6 +1659,7 @@ void Sorter::CleanupAllRuns() { unsorted_run_ = NULL; if (merge_output_run_ != NULL) merge_output_run_->CloseAllPages(); merge_output_run_ = NULL; + run_pool_.Clear(); } Status Sorter::SortCurrentInputRun() { @@ -1683,7 +1701,7 @@ Status Sorter::MergeIntermediateRuns() { // intermediate merges. // TODO: this isn't optimal: we could defer creating the merged run if we have // reliable reservations (IMPALA-3200). - merge_output_run_ = obj_pool_.Add( + merge_output_run_ = run_pool_.Add( new Run(this, output_row_desc_->tuple_descriptors()[0], false)); RETURN_IF_ERROR(merge_output_run_->Init()); RETURN_IF_ERROR(CreateMerger(num_runs_to_merge)); @@ -1754,6 +1772,8 @@ Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) { // Copy rows into the new run until done. int num_copied; RETURN_IF_CANCELLED(state_); + // Clear any temporary allocations made by the merger. + expr_results_pool_.Clear(); RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos)); RETURN_IF_ERROR( merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, &num_copied)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/sorter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index 5e7240b..2527958 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -93,18 +93,18 @@ class RowBatch; class Sorter { public: /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to be - /// sorted. 'compare_less_than' is a comparator for the sort tuples (returns true if - /// lhs < rhs). 'merge_batch_size_' is the size of the batches created to provide rows - /// to the merger and retrieve rows from an intermediate merger. 'node_id' is the ID of - /// the exec node using the sorter for error reporting. 'enable_spilling' should be set - /// to false to reduce the number of requested buffers if the caller will use - /// AddBatchNoSpill(). + /// sorted. 'ordering_exprs', 'is_asc_order' and 'nulls_first' are parameters + /// for the comparator for the sort tuples. + /// 'node_id' is the ID of the exec node using the sorter for error reporting. + /// 'enable_spilling' should be set to false to reduce the number of requested buffers + /// if the caller will use AddBatchNoSpill(). /// /// The Sorter assumes that it has exclusive use of the client's /// reservations for sorting, and may increase the size of the client's reservation. /// The caller is responsible for ensuring that the minimum reservation (returned from /// ComputeMinReservation()) is available. - Sorter(const TupleRowComparator& compare_less_than, + Sorter(const std::vector<ScalarExpr*>& ordering_exprs, + const std::vector<bool>& is_asc_order, const std::vector<bool>& nulls_first, const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc, MemTracker* mem_tracker, BufferPool::ClientHandle* client, int64_t page_len, RuntimeProfile* profile, RuntimeState* state, int node_id, @@ -113,8 +113,11 @@ class Sorter { /// Initial set-up of the sorter for execution. /// The evaluators for 'sort_tuple_exprs_' will be created and stored in 'obj_pool'. - /// All allocation from the evaluators will be from 'expr_mem_pool'. - Status Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) WARN_UNUSED_RESULT; + Status Prepare(ObjectPool* obj_pool) WARN_UNUSED_RESULT; + + /// Do codegen for the Sorter. Called after Prepare() if codegen is desired. Returns OK + /// if successful or a Status describing the reason why Codegen failed otherwise. + Status Codegen(RuntimeState* state); /// Opens the sorter for adding rows and initializes the evaluators for materializing /// the tuples. Must be called after Prepare() or Reset() and before calling AddBatch(). @@ -137,9 +140,6 @@ class Sorter { /// Get the next batch of sorted output rows from the sorter. Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT; - /// Free any local allocations made when materializing and sorting the tuples. - void FreeLocalAllocations(); - /// Resets all internal state like ExecNode::Reset(). /// Init() must have been called, AddBatch()/GetNext()/InputDone() /// may or may not have been called. @@ -196,8 +196,15 @@ class Sorter { /// Runtime state instance used to check for cancellation. Not owned. RuntimeState* const state_; + /// MemPool for allocating data structures used by expression evaluators in the sorter. + MemPool expr_perm_pool_; + + /// MemPool for allocations that hold results of expression evaluation in the sorter. + /// Cleared periodically during sorting to prevent memory accumulating. + MemPool expr_results_pool_; + /// In memory sorter and less-than comparator. - const TupleRowComparator& compare_less_than_; + TupleRowComparator compare_less_than_; boost::scoped_ptr<TupleSorter> in_mem_tuple_sorter_; /// Client used to allocate pages from the buffer pool. Not owned. @@ -251,7 +258,7 @@ class Sorter { Run* merge_output_run_; /// Pool of owned Run objects. Maintains Runs objects across non-freeing Reset() calls. - ObjectPool obj_pool_; + ObjectPool run_pool_; /// END: Members that must be Reset() ///////////////////////////////////////// @@ -259,6 +266,9 @@ class Sorter { /// Runtime profile and counters for this sorter instance. RuntimeProfile* profile_; + /// Pool of objects (e.g. exprs) that are not freed during Reset() calls. + ObjectPool obj_pool_; + /// Number of initial runs created. RuntimeProfile::Counter* initial_runs_counter_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/tuple.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc index 86f199d..252175f 100644 --- a/be/src/runtime/tuple.cc +++ b/be/src/runtime/tuple.cc @@ -233,8 +233,7 @@ void Tuple::MaterializeExprs(TupleRow* row, const TupleDescriptor& desc, } // Codegens an unrolled version of MaterializeExprs(). Uses codegen'd exprs and slot -// writes. If 'pool' is non-NULL, string data is copied into it. Note that the generated -// function ignores its 'pool' arg; instead we hardcode the pointer in the IR. +// writes. If 'pool' is non-NULL, string data is copied into it. // // Example IR for materializing a string column with non-NULL 'pool': // http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 54b78a5..b7a5e81 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -206,7 +206,7 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( exprs.push_back(expr); ScalarExprEvaluator* eval; status = ScalarExprEvaluator::Create(*expr, &state, &obj_pool, &expr_mem_pool, - &eval); + &expr_mem_pool, &eval); evals.push_back(eval); if (!status.ok()) goto error; }
