http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memcmpable_varint.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memcmpable_varint.h b/be/src/kudu/util/memcmpable_varint.h new file mode 100644 index 0000000..c1ce071 --- /dev/null +++ b/be/src/kudu/util/memcmpable_varint.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This is an alternate varint format, borrowed from sqlite4, that differs from the +// varint in util/coding.h in that its serialized form can be compared with memcmp(), +// yielding the same result as comparing the original integers. +// +// The serialized form also has the property that multiple such varints can be strung +// together to form a composite key, which itself is memcmpable. +// +// See memcmpable_varint.cc for further description. + +#ifndef KUDU_UTIL_MEMCMPABLE_VARINT_H +#define KUDU_UTIL_MEMCMPABLE_VARINT_H + +#include "kudu/util/faststring.h" +#include "kudu/util/slice.h" + +namespace kudu { + +void PutMemcmpableVarint64(faststring *dst, uint64_t value); + +// Standard Get... routines parse a value from the beginning of a Slice +// and advance the slice past the parsed value. +bool GetMemcmpableVarint64(Slice *input, uint64_t *value); + +} // namespace kudu + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/arena-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/arena-test.cc b/be/src/kudu/util/memory/arena-test.cc new file mode 100644 index 0000000..fc3a64d --- /dev/null +++ b/be/src/kudu/util/memory/arena-test.cc @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <memory> +#include <thread> +#include <vector> + +#include "kudu/gutil/stringprintf.h" +#include "kudu/util/memory/arena.h" +#include "kudu/util/memory/memory.h" +#include "kudu/util/mem_tracker.h" + +DEFINE_int32(num_threads, 16, "Number of threads to test"); +DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread should do"); +DEFINE_int32(alloc_size, 4, "number of bytes in each allocation"); + +namespace kudu { + +using std::shared_ptr; +using std::thread; +using std::vector; + +template<class ArenaType> +static void AllocateThread(ArenaType *arena, uint8_t thread_index) { + std::vector<void *> ptrs; + ptrs.reserve(FLAGS_allocs_per_thread); + + char buf[FLAGS_alloc_size]; + memset(buf, thread_index, FLAGS_alloc_size); + + for (int i = 0; i < FLAGS_allocs_per_thread; i++) { + void *alloced = arena->AllocateBytes(FLAGS_alloc_size); + CHECK(alloced); + memcpy(alloced, buf, FLAGS_alloc_size); + ptrs.push_back(alloced); + } + + for (void *p : ptrs) { + if (memcmp(buf, p, FLAGS_alloc_size) != 0) { + FAIL() << StringPrintf("overwritten pointer at %p", p); + } + } +} + +// Non-templated function to forward to above -- simplifies thread creation +static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t thread_index) { + AllocateThread(arena, thread_index); +} + + +TEST(TestArena, TestSingleThreaded) { + Arena arena(128, 128); + + AllocateThread(&arena, 0); +} + + + +TEST(TestArena, TestMultiThreaded) { + CHECK(FLAGS_num_threads < 256); + + ThreadSafeArena arena(1024, 1024); + + vector<thread> threads; + for (uint8_t i = 0; i < FLAGS_num_threads; i++) { + threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i); + } + + for (thread& thr : threads) { + thr.join(); + } +} + +TEST(TestArena, TestAlignment) { + + ThreadSafeArena arena(1024, 1024); + for (int i = 0; i < 1000; i++) { + int alignment = 1 << (1 % 5); + + void *ret = arena.AllocateBytesAligned(5, alignment); + ASSERT_EQ(0, (uintptr_t)(ret) % alignment) << + "failed to align on " << alignment << "b boundary: " << + ret; + } +} + +TEST(TestArena, TestObjectAlignment) { + struct MyStruct { + int64_t v; + }; + Arena a(256, 256 * 1024); + // Allocate a junk byte to ensure that the next allocation isn't "accidentally" aligned. + a.AllocateBytes(1); + void* v = a.NewObject<MyStruct>(); + ASSERT_EQ(reinterpret_cast<uintptr_t>(v) % alignof(MyStruct), 0); +} + + +// MemTrackers update their ancestors when consuming and releasing memory to compute +// usage totals. However, the lifetimes of parent and child trackers can be different. +// Validate that child trackers can still correctly update their parent stats even when +// the parents go out of scope. +TEST(TestArena, TestMemoryTrackerParentReferences) { + // Set up a parent and child MemTracker. + const string parent_id = "parent-id"; + const string child_id = "child-id"; + shared_ptr<MemTracker> child_tracker; + { + shared_ptr<MemTracker> parent_tracker = MemTracker::CreateTracker(1024, parent_id); + child_tracker = MemTracker::CreateTracker(-1, child_id, parent_tracker); + // Parent falls out of scope here. Should still be owned by the child. + } + shared_ptr<MemoryTrackingBufferAllocator> allocator( + new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), child_tracker)); + MemoryTrackingArena arena(256, 1024, allocator); + + // Try some child operations. + ASSERT_EQ(256, child_tracker->consumption()); + void *allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(256, child_tracker->consumption()); + allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(768, child_tracker->consumption()); +} + +TEST(TestArena, TestMemoryTrackingDontEnforce) { + shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, "arena-test-tracker"); + shared_ptr<MemoryTrackingBufferAllocator> allocator( + new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), mem_tracker)); + MemoryTrackingArena arena(256, 1024, allocator); + ASSERT_EQ(256, mem_tracker->consumption()); + void *allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(256, mem_tracker->consumption()); + allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(768, mem_tracker->consumption()); + + // In DEBUG mode after Reset() the last component of an arena is + // cleared, but is then created again; in release mode, the last + // component is not cleared. In either case, after Reset() + // consumption() should equal the size of the last component which + // is 512 bytes. + arena.Reset(); + ASSERT_EQ(512, mem_tracker->consumption()); + + // Allocate beyond allowed consumption. This should still go + // through, since enforce_limit is false. + allocated = arena.AllocateBytes(1024); + ASSERT_TRUE(allocated); + + ASSERT_EQ(1536, mem_tracker->consumption()); +} + +TEST(TestArena, TestMemoryTrackingEnforced) { + shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, "arena-test-tracker"); + shared_ptr<MemoryTrackingBufferAllocator> allocator( + new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), mem_tracker, + // enforce limit + true)); + MemoryTrackingArena arena(256, 1024, allocator); + ASSERT_EQ(256, mem_tracker->consumption()); + void *allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(256, mem_tracker->consumption()); + allocated = arena.AllocateBytes(1024); + ASSERT_FALSE(allocated); + ASSERT_EQ(256, mem_tracker->consumption()); +} + +TEST(TestArena, TestSTLAllocator) { + Arena a(256, 256 * 1024); + typedef vector<int, ArenaAllocator<int, false> > ArenaVector; + ArenaAllocator<int, false> alloc(&a); + ArenaVector v(alloc); + for (int i = 0; i < 10000; i++) { + v.push_back(i); + } + for (int i = 0; i < 10000; i++) { + ASSERT_EQ(i, v[i]); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/arena.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/arena.cc b/be/src/kudu/util/memory/arena.cc new file mode 100644 index 0000000..427072f --- /dev/null +++ b/be/src/kudu/util/memory/arena.cc @@ -0,0 +1,161 @@ +// Copyright 2010 Google Inc. All Rights Reserved +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +#include "kudu/util/memory/arena.h" + +#include <algorithm> +#include <mutex> + +#include "kudu/util/debug-util.h" +#include "kudu/util/flag_tags.h" + +using std::copy; +using std::max; +using std::min; +using std::reverse; +using std::sort; +using std::swap; +using std::unique_ptr; + +namespace kudu { + +template <bool THREADSAFE> +const size_t ArenaBase<THREADSAFE>::kMinimumChunkSize = 16; + +template <bool THREADSAFE> +ArenaBase<THREADSAFE>::ArenaBase( + BufferAllocator* const buffer_allocator, + size_t initial_buffer_size, + size_t max_buffer_size) + : buffer_allocator_(buffer_allocator), + max_buffer_size_(max_buffer_size), + arena_footprint_(0) { + AddComponent(CHECK_NOTNULL(NewComponent(initial_buffer_size, 0))); +} + +template <bool THREADSAFE> +ArenaBase<THREADSAFE>::ArenaBase(size_t initial_buffer_size, size_t max_buffer_size) + : buffer_allocator_(HeapBufferAllocator::Get()), + max_buffer_size_(max_buffer_size), + arena_footprint_(0) { + AddComponent(CHECK_NOTNULL(NewComponent(initial_buffer_size, 0))); +} + +template <bool THREADSAFE> +void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const size_t align) { + std::lock_guard<mutex_type> lock(component_lock_); + + // It's possible another thread raced with us and already allocated + // a new component, in which case we should try the "fast path" again + Component* cur = AcquireLoadCurrent(); + void * result = cur->AllocateBytesAligned(size, align); + if (PREDICT_FALSE(result != nullptr)) return result; + + // Really need to allocate more space. + size_t next_component_size = min(2 * cur->size(), max_buffer_size_); + // But, allocate enough, even if the request is large. In this case, + // might violate the max_element_size bound. + if (next_component_size < size) { + next_component_size = size; + } + // If soft quota is exhausted we will only get the "minimal" amount of memory + // we ask for. In this case if we always use "size" as minimal, we may degrade + // to allocating a lot of tiny components, one for each string added to the + // arena. This would be very inefficient, so let's first try something between + // "size" and "next_component_size". If it fails due to hard quota being + // exhausted, we'll fall back to using "size" as minimal. + size_t minimal = (size + next_component_size) / 2; + CHECK_LE(size, minimal); + CHECK_LE(minimal, next_component_size); + // Now, just make sure we can actually get the memory. + Component* component = NewComponent(next_component_size, minimal); + if (component == nullptr) { + component = NewComponent(next_component_size, size); + } + if (!component) return nullptr; + + // Now, must succeed. The component has at least 'size' bytes. + result = component->AllocateBytesAligned(size, align); + CHECK(result != nullptr); + + // Now add it to the arena. + AddComponent(component); + + return result; +} + +template <bool THREADSAFE> +typename ArenaBase<THREADSAFE>::Component* ArenaBase<THREADSAFE>::NewComponent( + size_t requested_size, + size_t minimum_size) { + Buffer* buffer = buffer_allocator_->BestEffortAllocate(requested_size, + minimum_size); + if (buffer == nullptr) return nullptr; + + CHECK_EQ(reinterpret_cast<uintptr_t>(buffer->data()) & (16 - 1), 0) + << "Components should be 16-byte aligned: " << buffer->data(); + + ASAN_POISON_MEMORY_REGION(buffer->data(), buffer->size()); + + return new Component(buffer); +} + +// LOCKING: component_lock_ must be held by the current thread. +template <bool THREADSAFE> +void ArenaBase<THREADSAFE>::AddComponent(ArenaBase::Component *component) { + ReleaseStoreCurrent(component); + arena_.push_back(unique_ptr<Component>(component)); + arena_footprint_ += component->size(); +} + +template <bool THREADSAFE> +void ArenaBase<THREADSAFE>::Reset() { + std::lock_guard<mutex_type> lock(component_lock_); + + if (PREDICT_FALSE(arena_.size() > 1)) { + unique_ptr<Component> last = std::move(arena_.back()); + arena_.clear(); + arena_.emplace_back(std::move(last)); + ReleaseStoreCurrent(arena_[0].get()); + } + arena_.back()->Reset(); + arena_footprint_ = arena_.back()->size(); + +#ifndef NDEBUG + // In debug mode release the last component too for (hopefully) better + // detection of memory-related bugs (invalid shallow copies, etc.). + size_t last_size = arena_.back()->size(); + arena_.clear(); + AddComponent(CHECK_NOTNULL(NewComponent(last_size, 0))); + arena_footprint_ = 0; +#endif +} + +template <bool THREADSAFE> +size_t ArenaBase<THREADSAFE>::memory_footprint() const { + std::lock_guard<mutex_type> lock(component_lock_); + return arena_footprint_; +} + +// Explicit instantiation. +template class ArenaBase<true>; +template class ArenaBase<false>; + + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/arena.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/arena.h b/be/src/kudu/util/memory/arena.h new file mode 100644 index 0000000..1c98564 --- /dev/null +++ b/be/src/kudu/util/memory/arena.h @@ -0,0 +1,473 @@ +// Copyright 2010 Google Inc. All Rights Reserved +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// +// Memory arena for variable-length datatypes and STL collections. + +#ifndef KUDU_UTIL_MEMORY_ARENA_H_ +#define KUDU_UTIL_MEMORY_ARENA_H_ + +#include <boost/signals2/dummy_mutex.hpp> +#include <glog/logging.h> +#include <memory> +#include <mutex> +#include <new> +#include <stddef.h> +#include <string.h> +#include <vector> + +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/logging-inl.h" +#include "kudu/gutil/macros.h" +#include "kudu/util/alignment.h" +#include "kudu/util/locks.h" +#include "kudu/util/memory/memory.h" +#include "kudu/util/mutex.h" +#include "kudu/util/slice.h" + +using std::allocator; + +namespace kudu { + +template<bool THREADSAFE> struct ArenaTraits; + +template <> struct ArenaTraits<true> { + typedef Atomic32 offset_type; + typedef Mutex mutex_type; + typedef simple_spinlock spinlock_type; +}; + +template <> struct ArenaTraits<false> { + typedef uint32_t offset_type; + // For non-threadsafe, we don't need any real locking. + typedef boost::signals2::dummy_mutex mutex_type; + typedef boost::signals2::dummy_mutex spinlock_type; +}; + +// A helper class for storing variable-length blobs (e.g. strings). Once a blob +// is added to the arena, its index stays fixed. No reallocation happens. +// Instead, the arena keeps a list of buffers. When it needs to grow, it +// allocates a new buffer. Each subsequent buffer is 2x larger, than its +// predecessor, until the maximum specified buffer size is reached. +// The buffers are furnished by a designated allocator. +// +// This class is thread-safe with the fast path lock-free. +template <bool THREADSAFE> +class ArenaBase { + public: + // Arenas are required to have a minimum size of at least this amount. + static const size_t kMinimumChunkSize; + + // Creates a new arena, with a single buffer of size up-to + // initial_buffer_size, upper size limit for later-allocated buffers capped + // at max_buffer_size, and maximum capacity (i.e. total sizes of all buffers) + // possibly limited by the buffer allocator. The allocator might cap the + // initial allocation request arbitrarily (down to zero). As a consequence, + // arena construction never fails due to OOM. + // + // Calls to AllocateBytes() will then give out bytes from the working buffer + // until it is exhausted. Then, a subsequent working buffer will be allocated. + // The size of the next buffer is normally 2x the size of the previous buffer. + // It might be capped by the allocator, or by the max_buffer_size parameter. + ArenaBase(BufferAllocator* const buffer_allocator, + size_t initial_buffer_size, + size_t max_buffer_size); + + // Creates an arena using a default (heap) allocator with unbounded capacity. + // Discretion advised. + ArenaBase(size_t initial_buffer_size, size_t max_buffer_size); + + // Adds content of the specified Slice to the arena, and returns a + // pointer to it. The pointer is guaranteed to remain valid during the + // lifetime of the arena. The Slice object itself is not copied. The + // size information is not stored. + // (Normal use case is that the caller already has an array of Slices, + // where it keeps these pointers together with size information). + // If this request would make the arena grow and the allocator denies that, + // returns NULL and leaves the arena unchanged. + uint8_t *AddSlice(const Slice& value); + + // Same as above. + void * AddBytes(const void *data, size_t len); + + // Handy wrapper for placement-new. + // + // This ensures that the returned object is properly aligned based on + // alignof(T). + template<class T, typename ... Args> + T *NewObject(Args&&... args); + + // Relocate the given Slice into the arena, setting 'dst' and + // returning true if successful. + // It is legal for 'dst' to be a pointer to 'src'. + // See AddSlice above for detail on memory lifetime. + bool RelocateSlice(const Slice &src, Slice *dst); + + // Similar to the above, but for StringPiece. + bool RelocateStringPiece(const StringPiece& src, StringPiece* sp); + + // Reserves a blob of the specified size in the arena, and returns a pointer + // to it. The caller can then fill the allocated memory. The pointer is + // guaranteed to remain valid during the lifetime of the arena. + // If this request would make the arena grow and the allocator denies that, + // returns NULL and leaves the arena unchanged. + void* AllocateBytes(const size_t size) { + return AllocateBytesAligned(size, 1); + } + + // Allocate bytes, ensuring a specified alignment. + // NOTE: alignment MUST be a power of two, or else this will break. + void* AllocateBytesAligned(const size_t size, const size_t alignment); + + // Removes all data from the arena. (Invalidates all pointers returned by + // AddSlice and AllocateBytes). Does not cause memory allocation. + // May reduce memory footprint, as it discards all allocated buffers but + // the last one. + // Unless allocations exceed max_buffer_size, repetitive filling up and + // resetting normally lead to quickly settling memory footprint and ceasing + // buffer allocations, as the arena keeps reusing a single, large buffer. + void Reset(); + + // Returns the memory footprint of this arena, in bytes, defined as a sum of + // all buffer sizes. Always greater or equal to the total number of + // bytes allocated out of the arena. + size_t memory_footprint() const; + + private: + typedef typename ArenaTraits<THREADSAFE>::mutex_type mutex_type; + // Encapsulates a single buffer in the arena. + class Component; + + // Fallback for AllocateBytes non-fast-path + void* AllocateBytesFallback(const size_t size, const size_t align); + + Component* NewComponent(size_t requested_size, size_t minimum_size); + void AddComponent(Component *component); + + // Load the current component, with "Acquire" semantics (see atomicops.h) + // if the arena is meant to be thread-safe. + inline Component* AcquireLoadCurrent() { + if (THREADSAFE) { + return reinterpret_cast<Component*>( + base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(¤t_))); + } else { + return current_; + } + } + + // Store the current component, with "Release" semantics (see atomicops.h) + // if the arena is meant to be thread-safe. + inline void ReleaseStoreCurrent(Component* c) { + if (THREADSAFE) { + base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(¤t_), + reinterpret_cast<AtomicWord>(c)); + } else { + current_ = c; + } + } + + BufferAllocator* const buffer_allocator_; + vector<std::unique_ptr<Component> > arena_; + + // The current component to allocate from. + // Use AcquireLoadCurrent and ReleaseStoreCurrent to load/store. + Component* current_; + const size_t max_buffer_size_; + size_t arena_footprint_; + + // Lock covering 'slow path' allocation, when new components are + // allocated and added to the arena's list. Also covers any other + // mutation of the component data structure (eg Reset). + mutable mutex_type component_lock_; + + DISALLOW_COPY_AND_ASSIGN(ArenaBase); +}; + +// STL-compliant allocator, for use with hash_maps and other structures +// which share lifetime with an Arena. Enables memory control and improves +// performance. +template<class T, bool THREADSAFE> class ArenaAllocator { + public: + typedef T value_type; + typedef size_t size_type; + typedef ptrdiff_t difference_type; + + typedef T* pointer; + typedef const T* const_pointer; + typedef T& reference; + typedef const T& const_reference; + pointer index(reference r) const { return &r; } + const_pointer index(const_reference r) const { return &r; } + size_type max_size() const { return size_t(-1) / sizeof(T); } + + explicit ArenaAllocator(ArenaBase<THREADSAFE>* arena) : arena_(arena) { + CHECK_NOTNULL(arena_); + } + + ~ArenaAllocator() { } + + pointer allocate(size_type n, allocator<void>::const_pointer /*hint*/ = 0) { + return reinterpret_cast<T*>(arena_->AllocateBytes(n * sizeof(T))); + } + + void deallocate(pointer p, size_type n) {} + + void construct(pointer p, const T& val) { + new(reinterpret_cast<void*>(p)) T(val); + } + + void destroy(pointer p) { p->~T(); } + + template<class U> struct rebind { + typedef ArenaAllocator<U, THREADSAFE> other; + }; + + template<class U, bool TS> ArenaAllocator(const ArenaAllocator<U, TS>& other) + : arena_(other.arena()) { } + + template<class U, bool TS> bool operator==(const ArenaAllocator<U, TS>& other) const { + return arena_ == other.arena(); + } + + template<class U, bool TS> bool operator!=(const ArenaAllocator<U, TS>& other) const { + return arena_ != other.arena(); + } + + ArenaBase<THREADSAFE> *arena() const { + return arena_; + } + + private: + + ArenaBase<THREADSAFE>* arena_; +}; + + +class Arena : public ArenaBase<false> { + public: + explicit Arena(size_t initial_buffer_size, size_t max_buffer_size) : + ArenaBase<false>(initial_buffer_size, max_buffer_size) + {} +}; + +class ThreadSafeArena : public ArenaBase<true> { + public: + explicit ThreadSafeArena(size_t initial_buffer_size, size_t max_buffer_size) : + ArenaBase<true>(initial_buffer_size, max_buffer_size) + {} +}; + +// Arena implementation that is integrated with MemTracker in order to +// track heap-allocated space consumed by the arena. + +class MemoryTrackingArena : public ArenaBase<false> { + public: + + MemoryTrackingArena( + size_t initial_buffer_size, + size_t max_buffer_size, + const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator) + : ArenaBase<false>(tracking_allocator.get(), initial_buffer_size, max_buffer_size), + tracking_allocator_(tracking_allocator) {} + + ~MemoryTrackingArena() { + } + + private: + + // This is required in order for the Arena to survive even after tablet is shut down, + // e.g., in the case of Scanners running scanners (see tablet_server-test.cc) + std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_; +}; + +class ThreadSafeMemoryTrackingArena : public ArenaBase<true> { + public: + + ThreadSafeMemoryTrackingArena( + size_t initial_buffer_size, + size_t max_buffer_size, + const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator) + : ArenaBase<true>(tracking_allocator.get(), initial_buffer_size, max_buffer_size), + tracking_allocator_(tracking_allocator) {} + + ~ThreadSafeMemoryTrackingArena() { + } + + private: + + // See comment in MemoryTrackingArena above. + std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_; +}; + +// Implementation of inline and template methods + +template<bool THREADSAFE> +class ArenaBase<THREADSAFE>::Component { + public: + explicit Component(Buffer* buffer) + : buffer_(buffer), + data_(static_cast<uint8_t*>(buffer->data())), + offset_(0), + size_(buffer->size()) {} + + // Tries to reserve space in this component. Returns the pointer to the + // reserved space if successful; NULL on failure (if there's no more room). + uint8_t* AllocateBytes(const size_t size) { + return AllocateBytesAligned(size, 1); + } + + uint8_t *AllocateBytesAligned(const size_t size, const size_t alignment); + + size_t size() const { return size_; } + void Reset() { + ASAN_POISON_MEMORY_REGION(data_, size_); + offset_ = 0; + } + + private: + // Mark the given range unpoisoned in ASAN. + // This is a no-op in a non-ASAN build. + void AsanUnpoison(const void* addr, size_t size); + + gscoped_ptr<Buffer> buffer_; + uint8_t* const data_; + typename ArenaTraits<THREADSAFE>::offset_type offset_; + const size_t size_; + +#ifdef ADDRESS_SANITIZER + // Lock used around unpoisoning memory when ASAN is enabled. + // ASAN does not support concurrent unpoison calls that may overlap a particular + // memory word (8 bytes). + typedef typename ArenaTraits<THREADSAFE>::spinlock_type spinlock_type; + spinlock_type asan_lock_; +#endif + DISALLOW_COPY_AND_ASSIGN(Component); +}; + + +// Thread-safe implementation +template <> +inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned( + const size_t size, const size_t alignment) { + // Special case check the allowed alignments. Currently, we only ensure + // the allocated buffer components are 16-byte aligned, and the code path + // doesn't support larger alignment. + DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || + alignment == 8 || alignment == 16) + << "bad alignment: " << alignment; + retry: + Atomic32 offset = Acquire_Load(&offset_); + + Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment); + Atomic32 new_offset = aligned + size; + + if (PREDICT_TRUE(new_offset <= size_)) { + bool success = Acquire_CompareAndSwap(&offset_, offset, new_offset) == offset; + if (PREDICT_TRUE(success)) { + AsanUnpoison(data_ + aligned, size); + return data_ + aligned; + } else { + // Raced with another allocator + goto retry; + } + } else { + return NULL; + } +} + +// Non-Threadsafe implementation +template <> +inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned( + const size_t size, const size_t alignment) { + DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || + alignment == 8 || alignment == 16) + << "bad alignment: " << alignment; + size_t aligned = KUDU_ALIGN_UP(offset_, alignment); + uint8_t* destination = data_ + aligned; + size_t save_offset = offset_; + offset_ = aligned + size; + if (PREDICT_TRUE(offset_ <= size_)) { + AsanUnpoison(data_ + aligned, size); + return destination; + } else { + offset_ = save_offset; + return NULL; + } +} + +template <bool THREADSAFE> +inline void ArenaBase<THREADSAFE>::Component::AsanUnpoison(const void* addr, size_t size) { +#ifdef ADDRESS_SANITIZER + std::lock_guard<spinlock_type> l(asan_lock_); + ASAN_UNPOISON_MEMORY_REGION(addr, size); +#endif +} + +// Fast-path allocation should get inlined, and fall-back +// to non-inline function call for allocation failure +template <bool THREADSAFE> +inline void *ArenaBase<THREADSAFE>::AllocateBytesAligned(const size_t size, const size_t align) { + void* result = AcquireLoadCurrent()->AllocateBytesAligned(size, align); + if (PREDICT_TRUE(result != NULL)) return result; + return AllocateBytesFallback(size, align); +} + +template <bool THREADSAFE> +inline uint8_t* ArenaBase<THREADSAFE>::AddSlice(const Slice& value) { + return reinterpret_cast<uint8_t *>(AddBytes(value.data(), value.size())); +} + +template <bool THREADSAFE> +inline void *ArenaBase<THREADSAFE>::AddBytes(const void *data, size_t len) { + void* destination = AllocateBytes(len); + if (destination == NULL) return NULL; + memcpy(destination, data, len); + return destination; +} + +template <bool THREADSAFE> +inline bool ArenaBase<THREADSAFE>::RelocateSlice(const Slice &src, Slice *dst) { + void* destination = AllocateBytes(src.size()); + if (destination == NULL) return false; + memcpy(destination, src.data(), src.size()); + *dst = Slice(reinterpret_cast<uint8_t *>(destination), src.size()); + return true; +} + + +template <bool THREADSAFE> +inline bool ArenaBase<THREADSAFE>::RelocateStringPiece(const StringPiece& src, StringPiece* sp) { + Slice slice(src.data(), src.size()); + if (!RelocateSlice(slice, &slice)) return false; + *sp = StringPiece(reinterpret_cast<const char*>(slice.data()), slice.size()); + return true; +} + +template<bool THREADSAFE> +template<class T, class ... Args> +inline T *ArenaBase<THREADSAFE>::NewObject(Args&&... args) { + void *mem = AllocateBytesAligned(sizeof(T), alignof(T)); + if (mem == NULL) throw std::bad_alloc(); + return new (mem) T(std::forward<Args>(args)...); +} + +} // namespace kudu + +#endif // KUDU_UTIL_MEMORY_ARENA_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/memory.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/memory.cc b/be/src/kudu/util/memory/memory.cc new file mode 100644 index 0000000..9db0464 --- /dev/null +++ b/be/src/kudu/util/memory/memory.cc @@ -0,0 +1,338 @@ +// Copyright 2010 Google Inc. All Rights Reserved +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "kudu/util/memory/memory.h" + +#include <string.h> + +#include <algorithm> +#include <cstdlib> + +#include <gflags/gflags.h> + +#include "kudu/util/alignment.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/memory/overwrite.h" +#include "kudu/util/mem_tracker.h" + +using std::copy; +using std::min; + +// TODO(onufry) - test whether the code still tests OK if we set this to true, +// or remove this code and add a test that Google allocator does not change it's +// contract - 16-aligned in -c opt and %16 == 8 in debug. +DEFINE_bool(allocator_aligned_mode, false, + "Use 16-byte alignment instead of 8-byte, " + "unless explicitly specified otherwise - to boost SIMD"); +TAG_FLAG(allocator_aligned_mode, hidden); + +namespace kudu { + +namespace { +static char dummy_buffer[0] = {}; +} + +Buffer::~Buffer() { +#if !defined(NDEBUG) && !defined(ADDRESS_SANITIZER) + // "unrolling" the string "BAD" makes for a much more efficient + // OverwriteWithPattern call in debug mode, so we can keep this + // useful bit of code without tests going slower! + // + // In ASAN mode, we don't bother with this, because when we free the memory, ASAN will + // prevent us from accessing it anyway. + OverwriteWithPattern(reinterpret_cast<char*>(data_), size_, + "BADBADBADBADBADBADBADBADBADBADBAD" + "BADBADBADBADBADBADBADBADBADBADBAD" + "BADBADBADBADBADBADBADBADBADBADBAD"); +#endif + if (allocator_ != nullptr) allocator_->FreeInternal(this); +} + +void BufferAllocator::LogAllocation(size_t requested, + size_t minimal, + Buffer* buffer) { + if (buffer == nullptr) { + LOG(WARNING) << "Memory allocation failed. " + << "Number of bytes requested: " << requested + << ", minimal: " << minimal; + return; + } + if (buffer->size() < requested) { + LOG(WARNING) << "Memory allocation was shorter than requested. " + << "Number of bytes requested to allocate: " << requested + << ", minimal: " << minimal + << ", and actually allocated: " << buffer->size(); + } +} + +HeapBufferAllocator::HeapBufferAllocator() + : aligned_mode_(FLAGS_allocator_aligned_mode) { +} + +Buffer* HeapBufferAllocator::AllocateInternal( + const size_t requested, + const size_t minimal, + BufferAllocator* const originator) { + DCHECK_LE(minimal, requested); + void* data; + size_t attempted = requested; + while (true) { + data = (attempted == 0) ? &dummy_buffer[0] : Malloc(attempted); + if (data != nullptr) { + return CreateBuffer(data, attempted, originator); + } + if (attempted == minimal) return nullptr; + attempted = minimal + (attempted - minimal - 1) / 2; + } +} + +bool HeapBufferAllocator::ReallocateInternal( + const size_t requested, + const size_t minimal, + Buffer* const buffer, + BufferAllocator* const originator) { + DCHECK_LE(minimal, requested); + void* data; + size_t attempted = requested; + while (true) { + if (attempted == 0) { + if (buffer->size() > 0) free(buffer->data()); + data = &dummy_buffer[0]; + } else { + if (buffer->size() > 0) { + data = Realloc(buffer->data(), buffer->size(), attempted); + } else { + data = Malloc(attempted); + } + } + if (data != nullptr) { + UpdateBuffer(data, attempted, buffer); + return true; + } + if (attempted == minimal) return false; + attempted = minimal + (attempted - minimal - 1) / 2; + } +} + +void HeapBufferAllocator::FreeInternal(Buffer* buffer) { + if (buffer->size() > 0) free(buffer->data()); +} + +void* HeapBufferAllocator::Malloc(size_t size) { + if (aligned_mode_) { + void* data; + if (posix_memalign(&data, 16, KUDU_ALIGN_UP(size, 16))) { + return nullptr; + } + return data; + } else { + return malloc(size); + } +} + +void* HeapBufferAllocator::Realloc(void* previousData, size_t previousSize, + size_t newSize) { + if (aligned_mode_) { + void* data = Malloc(newSize); + if (data) { +// NOTE(ptab): We should use realloc here to avoid memmory coping, +// but it doesn't work on memory allocated by posix_memalign(...). +// realloc reallocates the memory but doesn't preserve the content. +// TODO(ptab): reiterate after some time to check if it is fixed (tcmalloc ?) + memcpy(data, previousData, min(previousSize, newSize)); + free(previousData); + return data; + } else { + return nullptr; + } + } else { + return realloc(previousData, newSize); + } +} + +Buffer* ClearingBufferAllocator::AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) { + Buffer* buffer = DelegateAllocate(delegate_, requested, minimal, + originator); + if (buffer != nullptr) memset(buffer->data(), 0, buffer->size()); + return buffer; +} + +bool ClearingBufferAllocator::ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) { + size_t offset = (buffer != nullptr ? buffer->size() : 0); + bool success = DelegateReallocate(delegate_, requested, minimal, buffer, + originator); + if (success && buffer->size() > offset) { + memset(static_cast<char*>(buffer->data()) + offset, 0, + buffer->size() - offset); + } + return success; +} + +void ClearingBufferAllocator::FreeInternal(Buffer* buffer) { + DelegateFree(delegate_, buffer); +} + +Buffer* MediatingBufferAllocator::AllocateInternal( + const size_t requested, + const size_t minimal, + BufferAllocator* const originator) { + // Allow the mediator to trim the request. + size_t granted; + if (requested > 0) { + granted = mediator_->Allocate(requested, minimal); + if (granted < minimal) return nullptr; + } else { + granted = 0; + } + Buffer* buffer = DelegateAllocate(delegate_, granted, minimal, originator); + if (buffer == nullptr) { + mediator_->Free(granted); + } else if (buffer->size() < granted) { + mediator_->Free(granted - buffer->size()); + } + return buffer; +} + +bool MediatingBufferAllocator::ReallocateInternal( + const size_t requested, + const size_t minimal, + Buffer* const buffer, + BufferAllocator* const originator) { + // Allow the mediator to trim the request. Be conservative; assume that + // realloc may degenerate to malloc-memcpy-free. + size_t granted; + if (requested > 0) { + granted = mediator_->Allocate(requested, minimal); + if (granted < minimal) return false; + } else { + granted = 0; + } + size_t old_size = buffer->size(); + if (DelegateReallocate(delegate_, granted, minimal, buffer, originator)) { + mediator_->Free(granted - buffer->size() + old_size); + return true; + } else { + mediator_->Free(granted); + return false; + } +} + +void MediatingBufferAllocator::FreeInternal(Buffer* buffer) { + mediator_->Free(buffer->size()); + DelegateFree(delegate_, buffer); +} + +Buffer* MemoryStatisticsCollectingBufferAllocator::AllocateInternal( + const size_t requested, + const size_t minimal, + BufferAllocator* const originator) { + Buffer* buffer = DelegateAllocate(delegate_, requested, minimal, originator); + if (buffer != nullptr) { + memory_stats_collector_->AllocatedMemoryBytes(buffer->size()); + } else { + memory_stats_collector_->RefusedMemoryBytes(minimal); + } + return buffer; +} + +bool MemoryStatisticsCollectingBufferAllocator::ReallocateInternal( + const size_t requested, + const size_t minimal, + Buffer* const buffer, + BufferAllocator* const originator) { + const size_t old_size = buffer->size(); + bool outcome = DelegateReallocate(delegate_, requested, minimal, buffer, + originator); + if (buffer->size() > old_size) { + memory_stats_collector_->AllocatedMemoryBytes(buffer->size() - old_size); + } else if (buffer->size() < old_size) { + memory_stats_collector_->FreedMemoryBytes(old_size - buffer->size()); + } else if (!outcome && (minimal > buffer->size())) { + memory_stats_collector_->RefusedMemoryBytes(minimal - buffer->size()); + } + return outcome; +} + +void MemoryStatisticsCollectingBufferAllocator::FreeInternal(Buffer* buffer) { + DelegateFree(delegate_, buffer); + memory_stats_collector_->FreedMemoryBytes(buffer->size()); +} + +size_t MemoryTrackingBufferAllocator::Available() const { + return enforce_limit_ ? mem_tracker_->SpareCapacity() : std::numeric_limits<int64_t>::max(); +} + +bool MemoryTrackingBufferAllocator::TryConsume(int64_t bytes) { + // Calls TryConsume first, even if enforce_limit_ is false: this + // will cause mem_tracker_ to try to free up more memory by GCing. + if (!mem_tracker_->TryConsume(bytes)) { + if (enforce_limit_) { + return false; + } else { + // If enforce_limit_ is false, allocate memory anyway. + mem_tracker_->Consume(bytes); + } + } + return true; +} + +Buffer* MemoryTrackingBufferAllocator::AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) { + if (TryConsume(requested)) { + Buffer* buffer = DelegateAllocate(delegate_, requested, requested, originator); + if (buffer == nullptr) { + mem_tracker_->Release(requested); + } else { + return buffer; + } + } + + if (TryConsume(minimal)) { + Buffer* buffer = DelegateAllocate(delegate_, minimal, minimal, originator); + if (buffer == nullptr) { + mem_tracker_->Release(minimal); + } + return buffer; + } + + return nullptr; +} + + +bool MemoryTrackingBufferAllocator::ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) { + LOG(FATAL) << "Not implemented"; + return false; +} + +void MemoryTrackingBufferAllocator::FreeInternal(Buffer* buffer) { + DelegateFree(delegate_, buffer); + mem_tracker_->Release(buffer->size()); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/memory.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/memory.h b/be/src/kudu/util/memory/memory.h new file mode 100644 index 0000000..eabc142 --- /dev/null +++ b/be/src/kudu/util/memory/memory.h @@ -0,0 +1,976 @@ +// Copyright 2010 Google Inc. All Rights Reserved +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// +// Classes for memory management, used by materializations +// (arenas, segments, and STL collections parametrized via arena allocators) +// so that memory usage can be controlled at the application level. +// +// Materializations can be parametrized by specifying an instance of a +// BufferAllocator. The allocator implements +// memory management policy (e.g. setting allocation limits). Allocators may +// be shared between multiple materializations; e.g. you can designate a +// single allocator per a single user request, thus setting bounds on memory +// usage on a per-request basis. + +#ifndef KUDU_UTIL_MEMORY_MEMORY_H_ +#define KUDU_UTIL_MEMORY_MEMORY_H_ + +#include <algorithm> +#include <glog/logging.h> +#include <limits> +#include <memory> +#include <stddef.h> +#include <vector> + +#include "kudu/util/boost_mutex_utils.h" +#include "kudu/util/memory/overwrite.h" +#include "kudu/util/mutex.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/logging-inl.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/singleton.h" + +using std::copy; +using std::max; +using std::min; +using std::numeric_limits; +using std::reverse; +using std::sort; +using std::swap; +using std::vector; + +namespace kudu { + +class BufferAllocator; +class MemTracker; + +// Wrapper for a block of data allocated by a BufferAllocator. Owns the block. +// (To release the block, destroy the buffer - it will then return it via the +// same allocator that has been used to create it). +class Buffer { + public: + ~Buffer(); + + void* data() const { return data_; } // The data buffer. + size_t size() const { return size_; } // In bytes. + + private: + friend class BufferAllocator; + + Buffer(void* data, size_t size, BufferAllocator* allocator) + : data_(CHECK_NOTNULL(data)), + size_(size), + allocator_(allocator) { +#ifndef NDEBUG + OverwriteWithPattern(reinterpret_cast<char*>(data_), size_, + "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW" + "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW" + "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW"); +#endif + } + + // Called by a successful realloc. + void Update(void* new_data, size_t new_size) { +#ifndef NDEBUG + if (new_size > size_) { + OverwriteWithPattern(reinterpret_cast<char*>(new_data) + size_, + new_size - size_, "NEW"); + } +#endif + data_ = new_data; + size_ = new_size; + } + + void* data_; + size_t size_; + BufferAllocator* const allocator_; + DISALLOW_COPY_AND_ASSIGN(Buffer); +}; + +// Allocators allow applications to control memory usage. They are +// used by materializations to allocate blocks of memory arenas. +// BufferAllocator is an abstract class that defines a common contract of +// all implementations of allocators. Specific allocators provide specific +// features, e.g. enforced resource limits, thread safety, etc. +class BufferAllocator { + public: + virtual ~BufferAllocator() {} + + // Called by the user when a new block of memory is needed. The 'requested' + // parameter specifies how much memory (in bytes) the user would like to get. + // The 'minimal' parameter specifies how much he is willing to settle for. + // The allocator returns a buffer sized in the range [minimal, requested], + // or NULL if the request can't be satisfied. When the buffer is destroyed, + // its destructor calls the FreeInternal() method on its allocator. + // CAVEAT: The allocator must outlive all buffers returned by it. + // + // Corner cases: + // 1. If requested == 0, the allocator will always return a non-NULL Buffer + // with a non-NULL data pointer and zero capacity. + // 2. If minimal == 0, the allocator will always return a non-NULL Buffer + // with a non-NULL data pointer, possibly with zero capacity. + Buffer* BestEffortAllocate(size_t requested, size_t minimal) { + DCHECK_LE(minimal, requested); + Buffer* result = AllocateInternal(requested, minimal, this); + LogAllocation(requested, minimal, result); + return result; + } + + // Called by the user when a new block of memory is needed. Equivalent to + // BestEffortAllocate(requested, requested). + Buffer* Allocate(size_t requested) { + return BestEffortAllocate(requested, requested); + } + + // Called by the user when a previously allocated block needs to be resized. + // Mimics semantics of <cstdlib> realloc. The 'requested' and 'minimal' + // represent the desired final buffer size, with semantics as in the Allocate. + // If the 'buffer' parameter is NULL, the call is equivalent to + // Allocate(requested, minimal). Otherwise, a reallocation of the buffer's + // data is attempted. On success, the original 'buffer' parameter is returned, + // but the buffer itself might have updated size and data. On failure, + // returns NULL, and leaves the input buffer unmodified. + // Reallocation might happen in-place, preserving the original data + // pointer, but it is not guaranteed - e.g. this function might degenerate to + // Allocate-Copy-Free. Either way, the content of the data buffer, up to the + // minimum of the new and old size, is preserved. + // + // Corner cases: + // 1. If requested == 0, the allocator will always return a non-NULL Buffer + // with a non-NULL data pointer and zero capacity. + // 2. If minimal == 0, the allocator will always return a non-NULL Buffer + // with a non-NULL data pointer, possibly with zero capacity. + Buffer* BestEffortReallocate(size_t requested, + size_t minimal, + Buffer* buffer) { + DCHECK_LE(minimal, requested); + Buffer* result; + if (buffer == NULL) { + result = AllocateInternal(requested, minimal, this); + LogAllocation(requested, minimal, result); + return result; + } else { + result = ReallocateInternal(requested, minimal, buffer, this) ? + buffer : NULL; + LogAllocation(requested, minimal, buffer); + return result; + } + } + + // Called by the user when a previously allocated block needs to be resized. + // Equivalent to BestEffortReallocate(requested, requested, buffer). + Buffer* Reallocate(size_t requested, Buffer* buffer) { + return BestEffortReallocate(requested, requested, buffer); + } + + // Returns the amount of memory (in bytes) still available for this allocator. + // For unbounded allocators (like raw HeapBufferAllocator) this is the highest + // size_t value possible. + // TODO(user): consider making pure virtual. + virtual size_t Available() const { return numeric_limits<size_t>::max(); } + + protected: + friend class Buffer; + + BufferAllocator() {} + + // Expose the constructor to subclasses of BufferAllocator. + Buffer* CreateBuffer(void* data, + size_t size, + BufferAllocator* allocator) { + return new Buffer(data, size, allocator); + } + + // Expose Buffer::Update to subclasses of BufferAllocator. + void UpdateBuffer(void* new_data, size_t new_size, Buffer* buffer) { + buffer->Update(new_data, new_size); + } + + // Called by chained buffer allocators. + Buffer* DelegateAllocate(BufferAllocator* delegate, + size_t requested, + size_t minimal, + BufferAllocator* originator) { + return delegate->AllocateInternal(requested, minimal, originator); + } + + // Called by chained buffer allocators. + bool DelegateReallocate(BufferAllocator* delegate, + size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) { + return delegate->ReallocateInternal(requested, minimal, buffer, originator); + } + + // Called by chained buffer allocators. + void DelegateFree(BufferAllocator* delegate, Buffer* buffer) { + delegate->FreeInternal(buffer); + } + + private: + // Implemented by concrete subclasses. + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) = 0; + + // Implemented by concrete subclasses. Returns false on failure. + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) = 0; + + // Implemented by concrete subclasses. + virtual void FreeInternal(Buffer* buffer) = 0; + + // Logs a warning message if the allocation failed or if it returned less than + // the required number of bytes. + void LogAllocation(size_t required, size_t minimal, Buffer* buffer); + + DISALLOW_COPY_AND_ASSIGN(BufferAllocator); +}; + +// Allocates buffers on the heap, with no memory limits. Uses standard C +// allocation functions (malloc, realloc, free). +class HeapBufferAllocator : public BufferAllocator { + public: + virtual ~HeapBufferAllocator() {} + + // Returns a singleton instance of the heap allocator. + static HeapBufferAllocator* Get() { + return Singleton<HeapBufferAllocator>::get(); + } + + virtual size_t Available() const OVERRIDE { + return numeric_limits<size_t>::max(); + } + + private: + // Allocates memory that is aligned to 16 way. + // Use if you want to boost SIMD operations on the memory area. + const bool aligned_mode_; + + friend class Singleton<HeapBufferAllocator>; + + // Always allocates 'requested'-sized buffer, or returns NULL on OOM. + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + void* Malloc(size_t size); + void* Realloc(void* previousData, size_t previousSize, size_t newSize); + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + HeapBufferAllocator(); + explicit HeapBufferAllocator(bool aligned_mode) + : aligned_mode_(aligned_mode) {} + + DISALLOW_COPY_AND_ASSIGN(HeapBufferAllocator); +}; + +// Wrapper around the delegate allocator, that clears all newly allocated +// (and reallocated) memory. +class ClearingBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate. + explicit ClearingBufferAllocator(BufferAllocator* delegate) + : delegate_(delegate) {} + + virtual size_t Available() const OVERRIDE { + return delegate_->Available(); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + BufferAllocator* delegate_; + DISALLOW_COPY_AND_ASSIGN(ClearingBufferAllocator); +}; + +// Abstract policy for modifying allocation requests - e.g. enforcing quotas. +class Mediator { + public: + Mediator() {} + virtual ~Mediator() {} + + // Called by an allocator when a allocation request is processed. + // Must return a value in the range [minimal, requested], or zero. Returning + // zero (if minimal is non-zero) indicates denial to allocate. Returning + // non-zero indicates that the request should be capped at that value. + virtual size_t Allocate(size_t requested, size_t minimal) = 0; + + // Called by an allocator when the specified amount (in bytes) is released. + virtual void Free(size_t amount) = 0; + + // TODO(user): consider making pure virtual. + virtual size_t Available() const { return numeric_limits<size_t>::max(); } +}; + +// Optionally thread-safe skeletal implementation of a 'quota' abstraction, +// providing methods to allocate resources against the quota, and return them. +template<bool thread_safe> +class Quota : public Mediator { + public: + explicit Quota(bool enforced) : usage_(0), enforced_(enforced) {} + virtual ~Quota() {} + + // Returns a value in range [minimal, requested] if not exceeding remaining + // quota or if the quota is not enforced (soft quota), and adjusts the usage + // value accordingly. Otherwise, returns zero. The semantics of 'remaining + // quota' are defined by subclasses (that must supply GetQuotaInternal() + // method). + virtual size_t Allocate(size_t requested, size_t minimal) OVERRIDE; + + virtual void Free(size_t amount) OVERRIDE; + + // Returns memory still available in the quota. For unenforced Quota objects, + // you are still able to perform _minimal_ allocations when the available + // quota is 0 (or less than "minimal" param). + virtual size_t Available() const OVERRIDE { + lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex()); + const size_t quota = GetQuotaInternal(); + return (usage_ >= quota) ? 0 : (quota - usage_); + } + + // Returns the current quota value. + size_t GetQuota() const; + + // Returns the current usage value, defined as a sum of all the values + // granted by calls to Allocate, less these released via calls to Free. + size_t GetUsage() const; + + bool enforced() const { + return enforced_; + } + + protected: + // Overridden by specific implementations, to define semantics of + // the quota, i.e. the total amount of resources that the mediator will + // allocate. Called directly from GetQuota that optionally provides + // thread safety. An 'Allocate' request will succeed if + // GetUsage() + minimal <= GetQuota() or if the quota is not enforced (soft + // quota). + virtual size_t GetQuotaInternal() const = 0; + + Mutex* mutex() const { return thread_safe ? &mutex_ : NULL; } + + private: + mutable Mutex mutex_; + size_t usage_; + bool enforced_; + DISALLOW_COPY_AND_ASSIGN(Quota); +}; + +// Optionally thread-safe static quota implementation (where quota is explicitly +// set to a concrete numeric value). +template<bool thread_safe> +class StaticQuota : public Quota<thread_safe> { + public: + explicit StaticQuota(size_t quota) + : Quota<thread_safe>(true) { + SetQuota(quota); + } + StaticQuota(size_t quota, bool enforced) + : Quota<thread_safe>(enforced) { + SetQuota(quota); + } + virtual ~StaticQuota() {} + + // Sets quota to the new value. + void SetQuota(const size_t quota); + + protected: + virtual size_t GetQuotaInternal() const { return quota_; } + + private: + size_t quota_; + DISALLOW_COPY_AND_ASSIGN(StaticQuota); +}; + +// Places resource limits on another allocator, using the specified Mediator +// (e.g. quota) implementation. +// +// If the mediator and the delegate allocator are thread-safe, this allocator +// is also thread-safe, to the extent that it will not introduce any +// state inconsistencies. However, without additional synchronization, +// allocation requests are not atomic end-to-end. This way, it is deadlock- +// resilient (even if you have cyclic relationships between allocators) and +// allows better concurrency. But, it may cause over-conservative +// allocations under memory contention, if you have multiple levels of +// mediating allocators. For example, if two requests that can't both be +// satisfied are submitted concurrently, it may happen that one of them succeeds +// but gets smaller buffer allocated than it would if the requests were strictly +// ordered. This is usually not a problem, however, as you don't really want to +// operate so close to memory limits that some of your allocations can't be +// satisfied. If you do have a simple, cascading graph of allocators though, +// and want to force requests be atomic end-to-end, put a +// ThreadSafeBufferAllocator at the entry point. +class MediatingBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate, nor the mediator, allowing + // both to be reused. + MediatingBufferAllocator(BufferAllocator* const delegate, + Mediator* const mediator) + : delegate_(delegate), + mediator_(mediator) {} + + virtual ~MediatingBufferAllocator() {} + + virtual size_t Available() const OVERRIDE { + return min(delegate_->Available(), mediator_->Available()); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + BufferAllocator* delegate_; + Mediator* const mediator_; +}; + +// Convenience non-thread-safe static memory bounds enforcer. +// Combines MediatingBufferAllocator with a StaticQuota. +class MemoryLimit : public BufferAllocator { + public: + // Creates a limiter based on the default, heap allocator. Quota is infinite. + // (Can be set using SetQuota). + MemoryLimit() + : quota_(std::numeric_limits<size_t>::max()), + allocator_(HeapBufferAllocator::Get(), "a_) {} + + // Creates a limiter based on the default, heap allocator. + explicit MemoryLimit(size_t quota) + : quota_(quota), + allocator_(HeapBufferAllocator::Get(), "a_) {} + + // Creates a limiter relaying to the specified delegate allocator. + MemoryLimit(size_t quota, BufferAllocator* const delegate) + : quota_(quota), + allocator_(delegate, "a_) {} + + // Creates a (possibly non-enforcing) limiter relaying to the specified + // delegate allocator. + MemoryLimit(size_t quota, bool enforced, BufferAllocator* const delegate) + : quota_(quota, enforced), + allocator_(delegate, "a_) {} + + virtual ~MemoryLimit() {} + + virtual size_t Available() const OVERRIDE { + return allocator_.Available(); + } + + size_t GetQuota() const { return quota_.GetQuota(); } + size_t GetUsage() const { return quota_.GetUsage(); } + void SetQuota(const size_t quota) { quota_.SetQuota(quota); } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + return DelegateAllocate(&allocator_, requested, minimal, originator); + } + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + return DelegateReallocate(&allocator_, requested, minimal, buffer, + originator); + } + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + DelegateFree(&allocator_, buffer); + } + + StaticQuota<false> quota_; + MediatingBufferAllocator allocator_; +}; + +// An allocator that allows to bypass the (potential) soft quota below for a +// given amount of memory usage. The goal is to make the allocation methods and +// Available() work as if the allocator below had at least bypassed_amount of +// soft quota. Of course this class doesn't allow to exceed the hard quota. +class SoftQuotaBypassingBufferAllocator : public BufferAllocator { + public: + SoftQuotaBypassingBufferAllocator(BufferAllocator* allocator, + size_t bypassed_amount) + : allocator_(std::numeric_limits<size_t>::max(), allocator), + bypassed_amount_(bypassed_amount) {} + + virtual size_t Available() const OVERRIDE { + const size_t usage = allocator_.GetUsage(); + size_t available = allocator_.Available(); + if (bypassed_amount_ > usage) { + available = max(bypassed_amount_ - usage, available); + } + return available; + } + + private: + // Calculates how much to increase the minimal parameter to allocate more + // aggressively in the underlying allocator. This is to avoid getting only + // very small allocations when we exceed the soft quota below. The request + // with increased minimal size is more likely to fail because of exceeding + // hard quota, so we also fall back to the original minimal size. + size_t AdjustMinimal(size_t requested, size_t minimal) const { + return min(requested, max(minimal, Available())); + } + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + // Try increasing the "minimal" parameter to allocate more aggresively + // within the bypassed amount of soft quota. + Buffer* result = DelegateAllocate(&allocator_, + requested, + AdjustMinimal(requested, minimal), + originator); + if (result != NULL) { + return result; + } else { + return DelegateAllocate(&allocator_, + requested, + minimal, + originator); + } + } + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + if (DelegateReallocate(&allocator_, + requested, + AdjustMinimal(requested, minimal), + buffer, + originator)) { + return true; + } else { + return DelegateReallocate(&allocator_, + requested, + minimal, + buffer, + originator); + } + } + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + DelegateFree(&allocator_, buffer); + } + + // Using MemoryLimit with "infinite" limit to get GetUsage(). + MemoryLimit allocator_; + size_t bypassed_amount_; +}; + +// An interface for a MemoryStatisticsCollector - an object which collects +// information about the memory usage of the allocator. The collector will +// gather statistics about memory usage based on information received from the +// allocator. +class MemoryStatisticsCollectorInterface { + public: + MemoryStatisticsCollectorInterface() {} + + virtual ~MemoryStatisticsCollectorInterface() {} + + // Informs the collector that the allocator granted bytes memory. Note that in + // the case of reallocation bytes should be the increase in total memory + // usage, not the total size of the buffer after reallocation. + virtual void AllocatedMemoryBytes(size_t bytes) = 0; + + // Informs the collector that the allocator received a request for at least + // bytes memory, and rejected it (meaning that it granted nothing). + virtual void RefusedMemoryBytes(size_t bytes) = 0; + + // Informs the collector that bytes memory have been released to the + // allocator. + virtual void FreedMemoryBytes(size_t bytes) = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(MemoryStatisticsCollectorInterface); +}; + +class MemoryStatisticsCollectingBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate. + // Takes ownership of memory_stats_collector. + MemoryStatisticsCollectingBufferAllocator( + BufferAllocator* const delegate, + MemoryStatisticsCollectorInterface* const memory_stats_collector) + : delegate_(delegate), + memory_stats_collector_(memory_stats_collector) {} + + virtual ~MemoryStatisticsCollectingBufferAllocator() {} + + virtual size_t Available() const OVERRIDE { + return delegate_->Available(); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + BufferAllocator* delegate_; + gscoped_ptr<MemoryStatisticsCollectorInterface> + memory_stats_collector_; +}; + +// BufferAllocator which uses MemTracker to keep track of and optionally +// (if a limit is set on the MemTracker) regulate memory consumption. +class MemoryTrackingBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate. The delegate must remain + // valid for the lifetime of this allocator. Increments reference + // count for 'mem_tracker'. + // If 'mem_tracker' has a limit and 'enforce_limit' is true, then + // the classes calling this buffer allocator (whether directly, or + // through an Arena) must be able to handle the case when allocation + // fails. If 'enforce_limit' is false (this is the default), then + // allocation will always succeed. + MemoryTrackingBufferAllocator(BufferAllocator* const delegate, + std::shared_ptr<MemTracker> mem_tracker, + bool enforce_limit = false) + : delegate_(delegate), + mem_tracker_(std::move(mem_tracker)), + enforce_limit_(enforce_limit) {} + + virtual ~MemoryTrackingBufferAllocator() {} + + // If enforce limit is false, this always returns maximum possible value + // for int64_t (std::numeric_limits<int64_t>::max()). Otherwise, this + // is equivalent to calling mem_tracker_->SpareCapacity(); + virtual size_t Available() const OVERRIDE; + + private: + + // If enforce_limit_ is true, this is equivalent to calling + // mem_tracker_->TryConsume(bytes). If enforce_limit_ is false and + // mem_tracker_->TryConsume(bytes) is false, we call + // mem_tracker_->Consume(bytes) and always return true. + bool TryConsume(int64_t bytes); + + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + BufferAllocator* delegate_; + std::shared_ptr<MemTracker> mem_tracker_; + bool enforce_limit_; +}; + +// Synchronizes access to AllocateInternal and FreeInternal, and exposes the +// mutex for use by subclasses. Allocation requests performed through this +// allocator are atomic end-to-end. Template parameter DelegateAllocatorType +// allows to specify a subclass of BufferAllocator for the delegate, to allow +// subclasses of ThreadSafeBufferAllocator to access additional methods provided +// by the allocator subclass. If this is not needed, it can be set to +// BufferAllocator. +template <class DelegateAllocatorType> +class ThreadSafeBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate. + explicit ThreadSafeBufferAllocator(DelegateAllocatorType* delegate) + : delegate_(delegate) {} + virtual ~ThreadSafeBufferAllocator() {} + + virtual size_t Available() const OVERRIDE { + lock_guard_maybe<Mutex> lock(mutex()); + return delegate()->Available(); + } + + protected: + Mutex* mutex() const { return &mutex_; } + // Expose the delegate allocator, with the precise type of the allocator + // specified by the template parameter. The delegate() methods themselves + // don't give any thread-safety guarantees. Protect all uses taking the Mutex + // exposed by the mutex() method. + DelegateAllocatorType* delegate() { return delegate_; } + const DelegateAllocatorType* delegate() const { return delegate_; } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + lock_guard_maybe<Mutex> lock(mutex()); + return DelegateAllocate(delegate(), requested, minimal, originator); + } + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + lock_guard_maybe<Mutex> lock(mutex()); + return DelegateReallocate(delegate(), requested, minimal, buffer, + originator); + } + + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + lock_guard_maybe<Mutex> lock(mutex()); + DelegateFree(delegate(), buffer); + } + + DelegateAllocatorType* delegate_; + mutable Mutex mutex_; + DISALLOW_COPY_AND_ASSIGN(ThreadSafeBufferAllocator); +}; + +// A version of ThreadSafeBufferAllocator that owns the supplied delegate +// allocator. +template <class DelegateAllocatorType> +class OwningThreadSafeBufferAllocator + : public ThreadSafeBufferAllocator<DelegateAllocatorType> { + public: + explicit OwningThreadSafeBufferAllocator(DelegateAllocatorType* delegate) + : ThreadSafeBufferAllocator<DelegateAllocatorType>(delegate), + delegate_owned_(delegate) {} + virtual ~OwningThreadSafeBufferAllocator() {} + + private: + gscoped_ptr<DelegateAllocatorType> delegate_owned_; +}; + +class ThreadSafeMemoryLimit + : public OwningThreadSafeBufferAllocator<MemoryLimit> { + public: + ThreadSafeMemoryLimit(size_t quota, bool enforced, + BufferAllocator* const delegate) + : OwningThreadSafeBufferAllocator<MemoryLimit>( + new MemoryLimit(quota, enforced, delegate)) {} + virtual ~ThreadSafeMemoryLimit() {} + + size_t GetQuota() const { + lock_guard_maybe<Mutex> lock(mutex()); + return delegate()->GetQuota(); + } + size_t GetUsage() const { + lock_guard_maybe<Mutex> lock(mutex()); + return delegate()->GetUsage(); + } + void SetQuota(const size_t quota) { + lock_guard_maybe<Mutex> lock(mutex()); + delegate()->SetQuota(quota); + } +}; + +// A BufferAllocator that can be given ownership of many objects of given type. +// These objects will then be deleted when the buffer allocator is destroyed. +// The objects added last are deleted first (LIFO). +template <typename OwnedType> +class OwningBufferAllocator : public BufferAllocator { + public: + // Doesn't take ownership of delegate. + explicit OwningBufferAllocator(BufferAllocator* const delegate) + : delegate_(delegate) {} + + virtual ~OwningBufferAllocator() { + // Delete elements starting from the end. + while (!owned_.empty()) { + OwnedType* p = owned_.back(); + owned_.pop_back(); + delete p; + } + } + + // Add to the collection of objects owned by this allocator. The object added + // last is deleted first. + OwningBufferAllocator* Add(OwnedType* p) { + owned_.push_back(p); + return this; + } + + virtual size_t Available() const OVERRIDE { + return delegate_->Available(); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + return DelegateAllocate(delegate_, requested, minimal, originator); + } + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + return DelegateReallocate(delegate_, requested, minimal, buffer, + originator); + } + + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + DelegateFree(delegate_, buffer); + } + + // Not using PointerVector here because we want to guarantee certain order of + // deleting elements (starting from the ones added last). + vector<OwnedType*> owned_; + BufferAllocator* delegate_; +}; + +// Buffer allocator that tries to guarantee the exact and consistent amount +// of memory. Uses hard MemoryLimit to enforce the upper bound but also +// guarantees consistent allocations by ignoring minimal requested amounts and +// always returning the full amount of memory requested if available. +// Allocations will fail if the memory requested would exceed the quota or if +// the underlying allocator fails to provide the memory. +class GuaranteeMemory : public BufferAllocator { + public: + // Doesn't take ownership of 'delegate'. + GuaranteeMemory(size_t memory_quota, + BufferAllocator* delegate) + : limit_(memory_quota, true, delegate), + memory_guarantee_(memory_quota) {} + + virtual size_t Available() const OVERRIDE { + return memory_guarantee_ - limit_.GetUsage(); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + if (requested > Available()) { + return NULL; + } else { + return DelegateAllocate(&limit_, requested, requested, originator); + } + } + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + int64 additional_memory = requested - (buffer != NULL ? buffer->size() : 0); + return additional_memory <= static_cast<int64>(Available()) + && DelegateReallocate(&limit_, requested, requested, + buffer, originator); + } + + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + DelegateFree(&limit_, buffer); + } + + MemoryLimit limit_; + size_t memory_guarantee_; + DISALLOW_COPY_AND_ASSIGN(GuaranteeMemory); +}; + +// Implementation of inline and template methods + +template<bool thread_safe> +size_t Quota<thread_safe>::Allocate(const size_t requested, + const size_t minimal) { + lock_guard_maybe<Mutex> lock(mutex()); + DCHECK_LE(minimal, requested) + << "\"minimal\" shouldn't be bigger than \"requested\""; + const size_t quota = GetQuotaInternal(); + size_t allocation; + if (usage_ > quota || minimal > quota - usage_) { + // OOQ (Out of quota). + if (!enforced() && minimal <= numeric_limits<size_t>::max() - usage_) { + // The quota is unenforced and the value of "minimal" won't cause an + // overflow. Perform a minimal allocation. + allocation = minimal; + } else { + allocation = 0; + } + LOG(WARNING) << "Out of quota. Requested: " << requested + << " bytes, or at least minimal: " << minimal + << ". Current quota value is: " << quota + << " while current usage is: " << usage_ + << ". The quota is " << (enforced() ? "" : "not ") + << "enforced. " + << ((allocation == 0) ? "Did not allocate any memory." + : "Allocated the minimal value requested."); + } else { + allocation = min(requested, quota - usage_); + } + usage_ += allocation; + return allocation; +} + +template<bool thread_safe> +void Quota<thread_safe>::Free(size_t amount) { + lock_guard_maybe<Mutex> lock(mutex()); + usage_ -= amount; + // threads allocate/free memory concurrently via the same Quota object that is + // not protected with a mutex (thread_safe == false). + if (usage_ > (numeric_limits<size_t>::max() - (1 << 28))) { + LOG(ERROR) << "Suspiciously big usage_ value: " << usage_ + << " (could be a result size_t wrapping around below 0, " + << "for example as a result of race condition)."; + } +} + +template<bool thread_safe> +size_t Quota<thread_safe>::GetQuota() const { + lock_guard_maybe<Mutex> lock(mutex()); + return GetQuotaInternal(); +} + +template<bool thread_safe> +size_t Quota<thread_safe>::GetUsage() const { + lock_guard_maybe<Mutex> lock(mutex()); + return usage_; +} + +template<bool thread_safe> +void StaticQuota<thread_safe>::SetQuota(const size_t quota) { + lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex()); + quota_ = quota; +} + +} // namespace kudu + +#endif // KUDU_UTIL_MEMORY_MEMORY_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/overwrite.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/overwrite.cc b/be/src/kudu/util/memory/overwrite.cc new file mode 100644 index 0000000..c70cf4a --- /dev/null +++ b/be/src/kudu/util/memory/overwrite.cc @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/memory/overwrite.h" + +#include "kudu/gutil/strings/stringpiece.h" + +#include <string.h> +#include <glog/logging.h> +namespace kudu { + +void OverwriteWithPattern(char* p, size_t len, StringPiece pattern) { + size_t pat_len = pattern.size(); + CHECK_LT(0, pat_len); + size_t rem = len; + const char *pat_ptr = pattern.data(); + + while (rem >= pat_len) { + memcpy(p, pat_ptr, pat_len); + p += pat_len; + rem -= pat_len; + } + + while (rem-- > 0) { + *p++ = *pat_ptr++; + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/overwrite.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/overwrite.h b/be/src/kudu/util/memory/overwrite.h new file mode 100644 index 0000000..04286ed --- /dev/null +++ b/be/src/kudu/util/memory/overwrite.h @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_MEMORY_OVERWRITE_H +#define KUDU_MEMORY_OVERWRITE_H + +#include "kudu/gutil/strings/stringpiece.h" + +namespace kudu { + +// Overwrite 'p' with enough repetitions of 'pattern' to fill 'len' +// bytes. This is optimized at -O3 even in debug builds, so is +// reasonably efficient to use. +void OverwriteWithPattern(char* p, size_t len, StringPiece pattern); + +} // namespace kudu +#endif /* KUDU_MEMORY_OVERWRITE_H */ +
