http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/udf/udf-internal.h ---------------------------------------------------------------------- diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h index 0d75d81..840efa9 100644 --- a/be/src/udf/udf-internal.h +++ b/be/src/udf/udf-internal.h @@ -56,52 +56,54 @@ class ScalarExpr; class FunctionContextImpl { public: /// Create a FunctionContext for a UDF. Caller is responsible for deleting it. - static impala_udf::FunctionContext* CreateContext(RuntimeState* state, MemPool* pool, + /// UDF-managed allocations (i.e. Allocate()) are backed by 'perm_pool' and + /// allocations that may hold expr results (i.e. AllocateForResults()) are backed + /// by 'results_pool'. + static impala_udf::FunctionContext* CreateContext(RuntimeState* state, + MemPool* perm_pool, MemPool* results_pool, const impala_udf::FunctionContext::TypeDesc& return_type, const std::vector<impala_udf::FunctionContext::TypeDesc>& arg_types, int varargs_buffer_size = 0, bool debug = false); /// Create a FunctionContext for a UDA. Identical to the UDF version except for the /// intermediate type. Caller is responsible for deleting it. - static impala_udf::FunctionContext* CreateContext(RuntimeState* state, MemPool* pool, + static impala_udf::FunctionContext* CreateContext(RuntimeState* state, + MemPool* perm_pool, MemPool* results_pool, const impala_udf::FunctionContext::TypeDesc& intermediate_type, const impala_udf::FunctionContext::TypeDesc& return_type, const std::vector<impala_udf::FunctionContext::TypeDesc>& arg_types, int varargs_buffer_size = 0, bool debug = false); FunctionContextImpl(impala_udf::FunctionContext* parent); + ~FunctionContextImpl(); - /// Checks for any outstanding memory allocations. If there is unfreed memory, adds a - /// warning and frees the allocations. Note that local allocations are freed with the - /// MemPool backing pool_. + /// Checks for any outstanding memory allocations. If there is (non-result) memory that + /// was allocated by the UDF via this FunctionContext but not freed, adds a warning + /// and frees the allocations. void Close(); /// Returns a new FunctionContext with the same constant args, fragment-local state, and /// debug flag as this FunctionContext. The caller is responsible for calling delete on /// it. The cloned FunctionContext cannot be used after the original FunctionContext is /// destroyed because it may reference fragment-local state from the original. - impala_udf::FunctionContext* Clone(MemPool* pool); + impala_udf::FunctionContext* Clone(MemPool* perm_pool, MemPool* results_pool); - /// Allocates a buffer of 'byte_size' with "local" memory management. - /// If the new allocation causes the memory limit to be exceeded, the error will be set - /// in this object causing the query to fail. + /// Allocates a buffer of 'byte_size' to hold expr results. If the new allocation + /// causes the memory limit to be exceeded, the error will be set in this object + /// causing the query to fail. /// - /// These allocations are not freed one by one but freed as a pool by - /// FreeLocalAllocations(). This is used where the lifetime of the allocation is clear. - /// For UDFs, the allocations can be freed at the row level. - /// TODO: free them at the batch level and save some copies? - uint8_t* AllocateLocal(int64_t byte_size) noexcept; - - /// Resize a local allocation. - /// If the new allocation causes the memory limit to be exceeded, the error will be set - /// in this object causing the query to fail. - uint8_t* ReallocateLocal(uint8_t* ptr, int64_t byte_size) noexcept; - - /// Frees all allocations returned by AllocateLocal(). - void FreeLocalAllocations() noexcept; - - /// Returns true if there are any allocations returned by AllocateLocal(). - bool HasLocalAllocations() const { return !local_allocations_.empty(); } + /// These allocations live in the 'results_pool' passed into the constructor. + /// 'results_pool' is managed by the Impala runtime and can be safely cleared + /// whenever memory returned by the expression is no longer referenced. + uint8_t* AllocateForResults(int64_t byte_size) noexcept; + + /// Replaces the current 'results_pool_' for 'new_results_pool' to be used for + /// AllocateForResults(). Returns a pointer to the pool that was replaced. + MemPool* SwapResultsPool(MemPool* new_results_pool) { + MemPool* old_results_pool = results_pool_; + results_pool_ = new_results_pool; + return old_results_pool; + } /// Sets the constant arg list. The vector should contain one entry per argument, /// with a non-NULL entry if the argument is constant. The AnyVal* values are @@ -189,14 +191,14 @@ class FunctionContextImpl { friend class ScalarExprEvaluator; /// A utility function which checks for memory limits and null pointers returned by - /// Allocate(), Reallocate() and AllocateLocal() and sets the appropriate error status + /// Allocate(), Reallocate() and AllocateForResults() and sets the appropriate error status /// if necessary. /// /// Return false if 'buf' is null; returns true otherwise. bool CheckAllocResult(const char* fn_name, uint8_t* buf, int64_t byte_size); /// A utility function which checks for memory limits that may have been exceeded by - /// Allocate(), Reallocate(), AllocateLocal() or TrackAllocation(). Sets the + /// Allocate(), Reallocate(), AllocateForResults() or TrackAllocation(). Sets the /// appropriate error status if necessary. void CheckMemLimit(const char* fn_name, int64_t byte_size); @@ -209,8 +211,18 @@ class FunctionContextImpl { /// Parent context object. Not owned impala_udf::FunctionContext* context_; - /// Pool to service allocations from. - FreePool* pool_; + /// Pool used for allocations made via Allocate(). Allocations are explicitly freed and + /// returned to this pool with Free(). The memory allocated in this pool is effectively + /// owned by the UDF. + /// Owned and freed in destructor. Uses raw pointer to avoid pulling headers into SDK. + FreePool* udf_pool_; + + /// Pool used for allocations made via AllocateForResults(). Not owned by this + /// FunctionContext. Allocations made from the pool are used temporarily during + /// expression evaluation. Var-len values returned from an expression may reference + /// memory in this pool - the caller is responsible for ensuring that the pool is + /// not cleared while that memory is still referenced. + MemPool* results_pool_; /// We use the query's runtime state to report errors and warnings. NULL for test /// contexts. @@ -234,8 +246,6 @@ class FunctionContextImpl { /// Allocations made and still owned by the user function. Only used if debug_ is true /// because it is very expensive to maintain. std::map<uint8_t*, int> allocations_; - /// Allocations owned by Impala. - std::vector<uint8_t*> local_allocations_; /// The function state accessed via FunctionContext::Get/SetFunctionState() void* thread_local_fn_state_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/udf/udf-test-harness.cc ---------------------------------------------------------------------- diff --git a/be/src/udf/udf-test-harness.cc b/be/src/udf/udf-test-harness.cc index 46cbbe0..ada8fe1 100644 --- a/be/src/udf/udf-test-harness.cc +++ b/be/src/udf/udf-test-harness.cc @@ -30,7 +30,8 @@ FunctionContext* UdfTestHarness::CreateTestContext( const FunctionContext::TypeDesc& return_type, const vector<FunctionContext::TypeDesc>& arg_types, RuntimeState* state, MemPool* pool) { - return FunctionContextImpl::CreateContext(state, pool, return_type, arg_types, 0, true); + return FunctionContextImpl::CreateContext( + state, pool, pool, return_type, arg_types, 0, true); } void UdfTestHarness::SetConstantArgs( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/udf/udf.cc ---------------------------------------------------------------------- diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc index 82dd4d8..89eb8d1 100644 --- a/be/src/udf/udf.cc +++ b/be/src/udf/udf.cc @@ -41,7 +41,6 @@ // in the main binary, which does include FreePool. #define VLOG_ROW while(false) std::cout -#define VLOG_ROW_IS_ON (false) namespace impala { @@ -77,6 +76,13 @@ class FreePool { int64_t net_allocations_; }; +class MemPool { + public: + uint8_t* Allocate(int byte_size) { + return reinterpret_cast<uint8_t*>(malloc(byte_size)); + } +}; + class RuntimeState { public: void SetQueryStatus(const std::string& error_msg) { @@ -105,8 +111,10 @@ class RuntimeState { } #else +#include "common/atomic.h" #include "exprs/anyval-util.h" #include "runtime/free-pool.h" +#include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/runtime-state.h" #endif @@ -129,26 +137,44 @@ static const int MAX_WARNINGS = 1000; static_assert(__BYTE_ORDER == __LITTLE_ENDIAN, "DecimalVal memory layout assumes little-endianness"); -FunctionContext* FunctionContextImpl::CreateContext(RuntimeState* state, MemPool* pool, +#if !defined(NDEBUG) && !defined(IMPALA_UDF_SDK_BUILD) +DECLARE_int32(stress_fn_ctx_alloc); + +namespace { +/// Counter for tracking the number of allocations. Used only if the +/// the stress flag FLAGS_stress_fn_ctx_alloc is set. +AtomicInt32 alloc_counts(0); + +bool FailNextAlloc() { + return FLAGS_stress_fn_ctx_alloc > 0 && + (alloc_counts.Add(1) % FLAGS_stress_fn_ctx_alloc) == 0; +} +} +#endif + +FunctionContext* FunctionContextImpl::CreateContext(RuntimeState* state, + MemPool* udf_mem_pool, MemPool* results_pool, const FunctionContext::TypeDesc& return_type, - const vector<FunctionContext::TypeDesc>& arg_types, - int varargs_buffer_size, bool debug) { + const vector<FunctionContext::TypeDesc>& arg_types, int varargs_buffer_size, + bool debug) { FunctionContext::TypeDesc invalid_type; invalid_type.type = FunctionContext::INVALID_TYPE; invalid_type.precision = 0; invalid_type.scale = 0; - return FunctionContextImpl::CreateContext(state, pool, invalid_type, return_type, - arg_types, varargs_buffer_size, debug); + return FunctionContextImpl::CreateContext(state, udf_mem_pool, results_pool, + invalid_type, return_type, arg_types, varargs_buffer_size, debug); } -FunctionContext* FunctionContextImpl::CreateContext(RuntimeState* state, MemPool* pool, +FunctionContext* FunctionContextImpl::CreateContext(RuntimeState* state, + MemPool* udf_mem_pool, MemPool* results_pool, const FunctionContext::TypeDesc& intermediate_type, const FunctionContext::TypeDesc& return_type, - const vector<FunctionContext::TypeDesc>& arg_types, - int varargs_buffer_size, bool debug) { + const vector<FunctionContext::TypeDesc>& arg_types, int varargs_buffer_size, + bool debug) { impala_udf::FunctionContext* ctx = new impala_udf::FunctionContext(); + ctx->impl_->udf_pool_ = new FreePool(udf_mem_pool); + ctx->impl_->results_pool_ = results_pool; ctx->impl_->state_ = state; - ctx->impl_->pool_ = new FreePool(pool); ctx->impl_->intermediate_type_ = intermediate_type; ctx->impl_->return_type_ = return_type; ctx->impl_->arg_types_ = arg_types; @@ -156,14 +182,15 @@ FunctionContext* FunctionContextImpl::CreateContext(RuntimeState* state, MemPool aligned_malloc(varargs_buffer_size, VARARGS_BUFFER_ALIGNMENT)); ctx->impl_->varargs_buffer_size_ = varargs_buffer_size; ctx->impl_->debug_ = debug; - VLOG_ROW << "Created FunctionContext: " << ctx << " with pool " << ctx->impl_->pool_; + VLOG_ROW << "Created FunctionContext: " << ctx; return ctx; } -FunctionContext* FunctionContextImpl::Clone(MemPool* pool) { +FunctionContext* FunctionContextImpl::Clone( + MemPool* udf_mem_pool, MemPool* results_pool) { impala_udf::FunctionContext* new_context = - CreateContext(state_, pool, intermediate_type_, return_type_, arg_types_, - varargs_buffer_size_, debug_); + CreateContext(state_, udf_mem_pool, results_pool, intermediate_type_, + return_type_, arg_types_, varargs_buffer_size_, debug_); new_context->impl_->constant_args_ = constant_args_; new_context->impl_->fragment_local_fn_state_ = fragment_local_fn_state_; return new_context; @@ -174,7 +201,6 @@ FunctionContext::FunctionContext() : impl_(new FunctionContextImpl(this)) { FunctionContext::~FunctionContext() { assert(impl_->closed_ && "FunctionContext wasn't closed!"); - delete impl_->pool_; delete impl_; } @@ -182,7 +208,8 @@ FunctionContextImpl::FunctionContextImpl(FunctionContext* parent) : varargs_buffer_(NULL), varargs_buffer_size_(0), context_(parent), - pool_(NULL), + udf_pool_(NULL), + results_pool_(NULL), state_(NULL), debug_(false), version_(FunctionContext::v1_3), @@ -192,22 +219,20 @@ FunctionContextImpl::FunctionContextImpl(FunctionContext* parent) thread_local_fn_state_(NULL), fragment_local_fn_state_(NULL), external_bytes_tracked_(0), - closed_(false) { + closed_(false) {} + +FunctionContextImpl::~FunctionContextImpl() { + delete udf_pool_; } void FunctionContextImpl::Close() { if (closed_) return; - // Free local allocations first so we can detect leaks through any remaining allocations - // (local allocations cannot be leaked, at least not by the UDF) - FreeLocalAllocations(); - stringstream error_ss; if (!debug_) { - if (pool_->net_allocations() > 0) { - error_ss << "Memory leaked via FunctionContext::Allocate() " - << "or FunctionContext::AllocateLocal()"; - } else if (pool_->net_allocations() < 0) { + if (udf_pool_->net_allocations() > 0) { + error_ss << "Memory leaked via FunctionContext::Allocate()"; + } else if (udf_pool_->net_allocations() < 0) { error_ss << "FunctionContext::Free() called on buffer that was already freed or " "was not allocated."; } @@ -293,10 +318,9 @@ inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name, return true; } -inline void FunctionContextImpl::CheckMemLimit(const char* fn_name, - int64_t byte_size) { +inline void FunctionContextImpl::CheckMemLimit(const char* fn_name, int64_t byte_size) { #ifndef IMPALA_UDF_SDK_BUILD - MemTracker* mem_tracker = pool_->mem_tracker(); + MemTracker* mem_tracker = udf_pool_->mem_tracker(); if (mem_tracker->AnyLimitExceeded()) { ErrorMsg msg = ErrorMsg(TErrorCode::UDF_MEM_LIMIT_EXCEEDED, string(fn_name)); state_->SetMemLimitExceeded(mem_tracker, byte_size, &msg); @@ -306,7 +330,11 @@ inline void FunctionContextImpl::CheckMemLimit(const char* fn_name, uint8_t* FunctionContext::Allocate(int byte_size) noexcept { assert(!impl_->closed_); - uint8_t* buffer = impl_->pool_->Allocate(byte_size); +#if !defined(NDEBUG) && !defined(IMPALA_UDF_SDK_BUILD) + uint8_t* buffer = FailNextAlloc() ? nullptr : impl_->udf_pool_->Allocate(byte_size); +#else + uint8_t* buffer = impl_->udf_pool_->Allocate(byte_size); +#endif if (UNLIKELY(!impl_->CheckAllocResult("FunctionContext::Allocate", buffer, byte_size))) { return NULL; @@ -323,10 +351,14 @@ uint8_t* FunctionContext::Allocate(int byte_size) noexcept { uint8_t* FunctionContext::Reallocate(uint8_t* ptr, int byte_size) noexcept { assert(!impl_->closed_); - VLOG_ROW << "Reallocate: FunctionContext=" << this - << " size=" << byte_size + VLOG_ROW << "Reallocate: FunctionContext=" << this << " size=" << byte_size << " ptr=" << reinterpret_cast<void*>(ptr); - uint8_t* new_ptr = impl_->pool_->Reallocate(ptr, byte_size); +#if !defined(NDEBUG) && !defined(IMPALA_UDF_SDK_BUILD) + uint8_t* new_ptr = + FailNextAlloc() ? nullptr : impl_->udf_pool_->Reallocate(ptr, byte_size); +#else + uint8_t* new_ptr = impl_->udf_pool_->Reallocate(ptr, byte_size); +#endif if (UNLIKELY(!impl_->CheckAllocResult("FunctionContext::Reallocate", new_ptr, byte_size))) { return NULL; @@ -351,20 +383,20 @@ void FunctionContext::Free(uint8_t* buffer) noexcept { // fill in garbage value into the buffer to increase the chance of detecting misuse memset(buffer, 0xff, it->second); impl_->allocations_.erase(it); - impl_->pool_->Free(buffer); + impl_->udf_pool_->Free(buffer); } else { SetError("FunctionContext::Free() called on buffer that is already freed or was " "not allocated."); } } else { - impl_->pool_->Free(buffer); + impl_->udf_pool_->Free(buffer); } } void FunctionContext::TrackAllocation(int64_t bytes) { assert(!impl_->closed_); impl_->external_bytes_tracked_ += bytes; - impl_->pool_->mem_tracker()->Consume(bytes); + impl_->udf_pool_->mem_tracker()->Consume(bytes); impl_->CheckMemLimit("FunctionContext::TrackAllocation", bytes); } @@ -379,7 +411,7 @@ void FunctionContext::Free(int64_t bytes) { return; } impl_->external_bytes_tracked_ -= bytes; - impl_->pool_->mem_tracker()->Release(bytes); + impl_->udf_pool_->mem_tracker()->Release(bytes); } void FunctionContext::SetError(const char* error_msg) { @@ -435,60 +467,23 @@ void FunctionContext::SetFunctionState(FunctionStateScope scope, void* ptr) { } } -uint8_t* FunctionContextImpl::AllocateLocal(int64_t byte_size) noexcept { +uint8_t* FunctionContextImpl::AllocateForResults(int64_t byte_size) noexcept { assert(!closed_); - uint8_t* buffer = pool_->Allocate(byte_size); - if (UNLIKELY(!CheckAllocResult("FunctionContextImpl::AllocateLocal", - buffer, byte_size))) { +#if !defined(NDEBUG) && !defined(IMPALA_UDF_SDK_BUILD) + uint8_t* buffer = FailNextAlloc() ? nullptr : results_pool_->Allocate(byte_size); +#else + uint8_t* buffer = results_pool_->Allocate(byte_size); +#endif + if (UNLIKELY( + !CheckAllocResult("FunctionContextImpl::AllocateForResults", buffer, byte_size))) { return NULL; } - local_allocations_.push_back(buffer); - VLOG_ROW << "Allocate Local: FunctionContext=" << context_ + VLOG_ROW << "Allocate Results: FunctionContext=" << context_ << " size=" << byte_size << " result=" << reinterpret_cast<void*>(buffer); return buffer; } -uint8_t* FunctionContextImpl::ReallocateLocal(uint8_t* ptr, int64_t byte_size) noexcept { - assert(!closed_); - uint8_t* new_ptr = pool_->Reallocate(ptr, byte_size); - if (UNLIKELY(!CheckAllocResult("FunctionContextImpl::ReallocateLocal", - new_ptr, byte_size))) { - return NULL; - } - if (new_ptr != ptr) { - auto v = std::find(local_allocations_.rbegin(), local_allocations_.rend(), ptr); - assert(v != local_allocations_.rend()); - // Avoid perf issue; move to end of local allocations on any reallocation and - // always start the search from there. - if (v != local_allocations_.rbegin()) { - *v = *local_allocations_.rbegin(); - } - *local_allocations_.rbegin() = new_ptr; - } - VLOG_ROW << "Reallocate Local: FunctionContext=" << context_ - << " ptr=" << reinterpret_cast<void*>(ptr) << " size=" << byte_size - << " result=" << reinterpret_cast<void*>(new_ptr); - return new_ptr; -} - -void FunctionContextImpl::FreeLocalAllocations() noexcept { - assert(!closed_); - if (VLOG_ROW_IS_ON) { - stringstream ss; - ss << "Free local allocations: FunctionContext=" << context_ - << " pool=" << pool_ << endl; - for (int i = 0; i < local_allocations_.size(); ++i) { - ss << " " << reinterpret_cast<void*>(local_allocations_[i]) << endl; - } - VLOG_ROW << ss.str(); - } - for (int i = 0; i < local_allocations_.size(); ++i) { - pool_->Free(local_allocations_[i]); - } - local_allocations_.clear(); -} - void FunctionContextImpl::SetConstantArgs(vector<AnyVal*>&& constant_args) { constant_args_ = constant_args; } @@ -507,7 +502,7 @@ StringVal::StringVal(FunctionContext* context, int str_len) noexcept : len(str_l len = 0; is_null = true; } else { - ptr = context->impl()->AllocateLocal(str_len); + ptr = context->impl()->AllocateForResults(str_len); if (UNLIKELY(ptr == NULL && str_len > 0)) { #ifndef IMPALA_UDF_SDK_BUILD assert(!context->impl()->state()->GetQueryStatus().ok()); @@ -527,14 +522,20 @@ StringVal StringVal::CopyFrom(FunctionContext* ctx, const uint8_t* buf, size_t l } bool StringVal::Resize(FunctionContext* ctx, int new_len) noexcept { + if (new_len <= len) { + len = new_len; + return true; + } + if (UNLIKELY(new_len > StringVal::MAX_LENGTH)) { ctx->SetError("String length larger than allowed limit of 1 GB character data."); len = 0; is_null = true; return false; } - auto* new_ptr = ctx->impl()->ReallocateLocal(ptr, new_len); + auto* new_ptr = ctx->impl()->AllocateForResults(new_len); if (new_ptr != nullptr) { + memcpy(new_ptr, ptr, len); ptr = new_ptr; len = new_len; return true; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/udf/udf.h ---------------------------------------------------------------------- diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index 9d77601..3d04115 100644 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -347,15 +347,15 @@ typedef void (*UdfClose)(FunctionContext* context, /// intermediate type should be string and the UDA can cast the ptr to the structure /// it is using. /// -/// Memory Management: For allocations that are not returned to Impala, the UDA should use -/// the FunctionContext::Allocate()/Free() methods. In general, Allocate() is called in -/// Init(), and then Free() must be called in both Serialize() and Finalize(), since -/// either of these functions may be called to clean up the state. For StringVal -/// allocations returned to Impala (e.g. returned by UdaSerialize()), the UDA should -/// allocate the result via StringVal(FunctionContext*, int) ctor or the function -/// StringVal::CopyFrom(FunctionContext*, const uint8_t*, size_t) and Impala will -/// automatically handle freeing it. -// +/// Memory Management: allocations that are referred to by the intermediate values +/// returned by Init(), Update() and Merge() must be allocated via +/// FunctionContext::Allocate() and freed via FunctionContext::Free(). Both Serialize() +/// and Finalize() are responsible for cleaning up the intermediate value and freeing +/// such allocations. StringVals returned to Impala directly by Serialize(), Finalize() +/// or GetValue() should be backed by temporary results memory allocated via the +/// StringVal(FunctionContext*, int) ctor, StringVal::CopyFrom(FunctionContext*, +/// const uint8_t*, size_t), or StringVal::Resize(). +/// /// Note that in the rare case the StringVal ctor or StringVal::CopyFrom() fail to /// allocate memory, the StringVal object will be marked as a null string. /// Serialize()/Finalize() should handle allocation failures by checking the is_null @@ -609,17 +609,19 @@ struct StringVal : public AnyVal { /// large, the constructor will construct a NULL string and set an error on the function /// context. /// - /// The memory backing this StringVal is a local allocation, and so doesn't need + /// The memory backing this StringVal is managed by the Impala runtime and so doesn't need /// to be explicitly freed. StringVal(FunctionContext* context, int len) NOEXCEPT; - /// Reallocate a StringVal that is backed by a local allocation so that it as - /// at least as large as len. May shrink or / expand the string. If the - /// string is expanded, the content of the new space is undefined. + /// Resize a string value to 'len'. If 'len' is the same as or smaller than the current + /// length, truncates the string. Otherwise, increases the string's length, allocating + /// new memory and copying over the current contents if needed. The content of the new + /// space is undefined. If a resize fails, the length and contents of the StringVal are + /// unchanged. /// - /// If the resize fails, the original StringVal remains in place. Callers do not - /// otherwise need to be concerned with backing storage, which is allocated from a - /// local allocation. + /// Resized strings can be returned from UDFs as the result value. Callers do not + /// otherwise need to be concerned with backing storage, which is managed by the + /// Impala runtime and freed at some point after the UDF returns. /// /// Returns true on success, false on failure. bool Resize(FunctionContext* context, int len) NOEXCEPT; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/util/tuple-row-compare.cc ---------------------------------------------------------------------- diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc index 1784893..9a012ca 100644 --- a/be/src/util/tuple-row-compare.cc +++ b/be/src/util/tuple-row-compare.cc @@ -30,17 +30,17 @@ using namespace impala; using namespace llvm; using namespace strings; -Status TupleRowComparator::Open( - ObjectPool* pool, RuntimeState* state, MemPool* expr_mem_pool) { +Status TupleRowComparator::Open(ObjectPool* pool, RuntimeState* state, + MemPool* expr_perm_pool, MemPool* expr_results_pool) { if (ordering_expr_evals_lhs_.empty()) { RETURN_IF_ERROR(ScalarExprEvaluator::Create(ordering_exprs_, state, pool, - expr_mem_pool, &ordering_expr_evals_lhs_)); + expr_perm_pool, expr_results_pool, &ordering_expr_evals_lhs_)); RETURN_IF_ERROR(ScalarExprEvaluator::Open(ordering_expr_evals_lhs_, state)); } DCHECK_EQ(ordering_exprs_.size(), ordering_expr_evals_lhs_.size()); if (ordering_expr_evals_rhs_.empty()) { - RETURN_IF_ERROR(ScalarExprEvaluator::Clone(pool, state, expr_mem_pool, - ordering_expr_evals_lhs_, &ordering_expr_evals_rhs_)); + RETURN_IF_ERROR(ScalarExprEvaluator::Clone(pool, state, expr_perm_pool, + expr_results_pool, ordering_expr_evals_lhs_, &ordering_expr_evals_rhs_)); } DCHECK_EQ(ordering_expr_evals_lhs_.size(), ordering_expr_evals_rhs_.size()); return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/util/tuple-row-compare.h ---------------------------------------------------------------------- diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h index 2db5604..e1933d5 100644 --- a/be/src/util/tuple-row-compare.h +++ b/be/src/util/tuple-row-compare.h @@ -74,10 +74,12 @@ class TupleRowComparator { for (bool null_first : nulls_first) nulls_first_.push_back(null_first ? -1 : 1); } - /// Create the evaluators for the ordering expressions and store them in 'pool'. Any - /// allocation during initialization of the evaluators will come from 'expr_mem_pool'. - /// 'state' is passed in for initialization of the evaluator. - Status Open(ObjectPool* pool, RuntimeState* state, MemPool* expr_mem_pool); + /// Create the evaluators for the ordering expressions and store them in 'pool'. The + /// evaluators use 'expr_perm_pool' and 'expr_results_pool' for permanent and result + /// allocations made by exprs respectively. 'state' is passed in for initialization + /// of the evaluator. + Status Open(ObjectPool* pool, RuntimeState* state, MemPool* expr_perm_pool, + MemPool* expr_results_pool); /// Release resources held by the ordering expressions' evaluators. void Close(RuntimeState* state); @@ -111,12 +113,6 @@ class TupleRowComparator { return Less(lhs_row, rhs_row); } - /// Free any local allocations made during expression evaluations in Compare(). - void FreeLocalAllocations() const { - ScalarExprEvaluator::FreeLocalAllocations(ordering_expr_evals_lhs_); - ScalarExprEvaluator::FreeLocalAllocations(ordering_expr_evals_rhs_); - } - private: /// Interpreted implementation of Compare(). int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test b/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test index fed59b9..fe4fa87 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test +++ b/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test @@ -23,12 +23,12 @@ FunctionContext::Allocate() failed to allocate 4 bytes. ---- QUERY select extract(year from timestamp_col) from functional.alltypes limit 10 ---- CATCH -FunctionContextImpl::AllocateLocal() failed to allocate 4 bytes. +FunctionContextImpl::AllocateForResults() failed to allocate 4 bytes. ==== ---- QUERY select trunc(timestamp_col, 'YEAR') from functional.alltypes limit 10 ---- CATCH -FunctionContextImpl::AllocateLocal() failed to allocate 4 bytes. +FunctionContextImpl::AllocateForResults() failed to allocate 4 bytes. ==== ---- QUERY select first_value(string_col) over (partition by month order by year) from functional.alltypes @@ -53,7 +53,7 @@ FunctionContext::Allocate() failed to allocate 16 bytes. ---- QUERY select cast(string_col as char(120)) from functional.alltypes ---- CATCH -FunctionContextImpl::AllocateLocal() failed to allocate 120 bytes. +FunctionContextImpl::AllocateForResults() failed to allocate 120 bytes. ==== ---- QUERY select appx_median(int_col) from functional.alltypes http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/tests/custom_cluster/test_alloc_fail.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_alloc_fail.py b/tests/custom_cluster/test_alloc_fail.py index f1ae3dc..888dd9a 100644 --- a/tests/custom_cluster/test_alloc_fail.py +++ b/tests/custom_cluster/test_alloc_fail.py @@ -29,7 +29,7 @@ class TestAllocFail(CustomClusterTestSuite): return 'functional-query' @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--stress_free_pool_alloc=1") + @CustomClusterTestSuite.with_args("--stress_fn_ctx_alloc=1") def test_alloc_fail_init(self, vector): self.run_test_case('QueryTest/alloc-fail-init', vector) @@ -37,7 +37,7 @@ class TestAllocFail(CustomClusterTestSuite): @pytest.mark.xfail(run=True, reason="IMPALA-2925: the execution is not deterministic " "so some tests sometimes don't fail as expected") @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--stress_free_pool_alloc=3") + @CustomClusterTestSuite.with_args("--stress_fn_ctx_alloc=3") def test_alloc_fail_update(self, vector, unique_database): # Note that this test relies on pre-aggregation to exercise the Serialize() path so # query option 'num_nodes' must not be 1. CustomClusterTestSuite.add_test_dimensions()
