http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memcmpable_varint.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memcmpable_varint.cc b/be/src/kudu/util/memcmpable_varint.cc new file mode 100644 index 0000000..b30eff6 --- /dev/null +++ b/be/src/kudu/util/memcmpable_varint.cc @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This file contains code derived from sqlite4, distributed in the public domain. +// +// A variable length integer is an encoding of 64-bit unsigned integers +// into between 1 and 9 bytes. The encoding is designed so that small +// (and common) values take much less space that larger values. Additional +// properties: +// +// * The length of the varint can be determined after examining just +// the first byte of the encoding. +// +// * Varints compare in numerical order using memcmp(). +// +//************************************************************************ +// +// Treat each byte of the encoding as an unsigned integer between 0 and 255. +// Let the bytes of the encoding be called A0, A1, A2, ..., A8. +// +// DECODE +// +// If A0 is between 0 and 240 inclusive, then the result is the value of A0. +// +// If A0 is between 241 and 248 inclusive, then the result is +// 240+256*(A0-241)+A1. +// +// If A0 is 249 then the result is 2288+256*A1+A2. +// +// If A0 is 250 then the result is A1..A3 as a 3-byte big-ending integer. +// +// If A0 is 251 then the result is A1..A4 as a 4-byte big-ending integer. +// +// If A0 is 252 then the result is A1..A5 as a 5-byte big-ending integer. +// +// If A0 is 253 then the result is A1..A6 as a 6-byte big-ending integer. +// +// If A0 is 254 then the result is A1..A7 as a 7-byte big-ending integer. +// +// If A0 is 255 then the result is A1..A8 as a 8-byte big-ending integer. +// +// ENCODE +// +// Let the input value be V. +// +// If V<=240 then output a single by A0 equal to V. +// +// If V<=2287 then output A0 as (V-240)/256 + 241 and A1 as (V-240)%256. +// +// If V<=67823 then output A0 as 249, A1 as (V-2288)/256, and A2 +// as (V-2288)%256. +// +// If V<=16777215 then output A0 as 250 and A1 through A3 as a big-endian +// 3-byte integer. +// +// If V<=4294967295 then output A0 as 251 and A1..A4 as a big-ending +// 4-byte integer. +// +// If V<=1099511627775 then output A0 as 252 and A1..A5 as a big-ending +// 5-byte integer. +// +// If V<=281474976710655 then output A0 as 253 and A1..A6 as a big-ending +// 6-byte integer. +// +// If V<=72057594037927935 then output A0 as 254 and A1..A7 as a +// big-ending 7-byte integer. +// +// Otherwise then output A0 as 255 and A1..A8 as a big-ending 8-byte integer. +// +// SUMMARY +// +// Bytes Max Value Digits +// ------- --------- --------- +// 1 240 2.3 +// 2 2287 3.3 +// 3 67823 4.8 +// 4 2**24-1 7.2 +// 5 2**32-1 9.6 +// 6 2**40-1 12.0 +// 7 2**48-1 14.4 +// 8 2**56-1 16.8 +// 9 2**64-1 19.2 + +#include <cstddef> + +#include <glog/logging.h> + +#include "kudu/util/faststring.h" +#include "kudu/util/memcmpable_varint.h" +#include "kudu/util/slice.h" + +namespace kudu { + +//////////////////////////////////////////////////////////// +// Begin code ripped from sqlite4 +//////////////////////////////////////////////////////////// + +// This function is borrowed from sqlite4/varint.c +static void varintWrite32(uint8_t *z, uint32_t y) { + z[0] = (uint8_t)(y>>24); + z[1] = (uint8_t)(y>>16); + z[2] = (uint8_t)(y>>8); + z[3] = (uint8_t)(y); +} + + +// Write a varint into z[]. The buffer z[] must be at least 9 characters +// long to accommodate the largest possible varint. Return the number of +// bytes of z[] used. +// +// This function is borrowed from sqlite4/varint.c +static size_t sqlite4PutVarint64(uint8_t *z, uint64_t x) { + uint64_t w, y; + if (x <= 240) { + z[0] = (uint8_t)x; + return 1; + } + if (x <= 2287) { + y = (uint64_t)(x - 240); + z[0] = (uint8_t)(y/256 + 241); + z[1] = (uint8_t)(y%256); + return 2; + } + if (x <= 67823) { + y = (uint64_t)(x - 2288); + z[0] = 249; + z[1] = (uint8_t)(y/256); + z[2] = (uint8_t)(y%256); + return 3; + } + y = (uint64_t)x; + w = (uint64_t)(x>>32); + if (w == 0) { + if (y <= 16777215) { + z[0] = 250; + z[1] = (uint8_t)(y>>16); + z[2] = (uint8_t)(y>>8); + z[3] = (uint8_t)(y); + return 4; + } + z[0] = 251; + varintWrite32(z+1, y); + return 5; + } + if (w <= 255) { + z[0] = 252; + z[1] = (uint8_t)w; + varintWrite32(z+2, y); + return 6; + } + if (w <= 65535) { + z[0] = 253; + z[1] = (uint8_t)(w>>8); + z[2] = (uint8_t)w; + varintWrite32(z+3, y); + return 7; + } + if (w <= 16777215) { + z[0] = 254; + z[1] = (uint8_t)(w>>16); + z[2] = (uint8_t)(w>>8); + z[3] = (uint8_t)w; + varintWrite32(z+4, y); + return 8; + } + z[0] = 255; + varintWrite32(z+1, w); + varintWrite32(z+5, y); + return 9; +} + +// Decode the varint in the first n bytes z[]. Write the integer value +// into *pResult and return the number of bytes in the varint. +// +// If the decode fails because there are not enough bytes in z[] then +// return 0; +// +// Borrowed from sqlite4 varint.c +static int sqlite4GetVarint64( + const uint8_t *z, + int n, + uint64_t *p_result) { + unsigned int x; + if ( n < 1) return 0; + if (z[0] <= 240) { + *p_result = z[0]; + return 1; + } + if (z[0] <= 248) { + if ( n < 2) return 0; + *p_result = (z[0]-241)*256 + z[1] + 240; + return 2; + } + if (n < z[0]-246 ) return 0; + if (z[0] == 249) { + *p_result = 2288 + 256*z[1] + z[2]; + return 3; + } + if (z[0] == 250) { + *p_result = (z[1]<<16) + (z[2]<<8) + z[3]; + return 4; + } + x = (z[1]<<24) + (z[2]<<16) + (z[3]<<8) + z[4]; + if (z[0] == 251) { + *p_result = x; + return 5; + } + if (z[0] == 252) { + *p_result = (((uint64_t)x)<<8) + z[5]; + return 6; + } + if (z[0] == 253) { + *p_result = (((uint64_t)x)<<16) + (z[5]<<8) + z[6]; + return 7; + } + if (z[0] == 254) { + *p_result = (((uint64_t)x)<<24) + (z[5]<<16) + (z[6]<<8) + z[7]; + return 8; + } + *p_result = (((uint64_t)x)<<32) + + (0xffffffff & ((z[5]<<24) + (z[6]<<16) + (z[7]<<8) + z[8])); + return 9; +} + +//////////////////////////////////////////////////////////// +// End code ripped from sqlite4 +//////////////////////////////////////////////////////////// + +void PutMemcmpableVarint64(faststring *dst, uint64_t value) { + uint8_t buf[9]; + int used = sqlite4PutVarint64(buf, value); + DCHECK_LE(used, sizeof(buf)); + dst->append(buf, used); +} + +bool GetMemcmpableVarint64(Slice *input, uint64_t *value) { + size_t size = sqlite4GetVarint64(input->data(), input->size(), value); + input->remove_prefix(size); + return size > 0; +} + + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memcmpable_varint.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memcmpable_varint.h b/be/src/kudu/util/memcmpable_varint.h new file mode 100644 index 0000000..955f89d --- /dev/null +++ b/be/src/kudu/util/memcmpable_varint.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This is an alternate varint format, borrowed from sqlite4, that differs from the +// varint in util/coding.h in that its serialized form can be compared with memcmp(), +// yielding the same result as comparing the original integers. +// +// The serialized form also has the property that multiple such varints can be strung +// together to form a composite key, which itself is memcmpable. +// +// See memcmpable_varint.cc for further description. + +#ifndef KUDU_UTIL_MEMCMPABLE_VARINT_H +#define KUDU_UTIL_MEMCMPABLE_VARINT_H + +#include <cstdint> + +namespace kudu { + +class Slice; +class faststring; + +void PutMemcmpableVarint64(faststring *dst, uint64_t value); + +// Standard Get... routines parse a value from the beginning of a Slice +// and advance the slice past the parsed value. +bool GetMemcmpableVarint64(Slice *input, uint64_t *value); + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/arena-test.cc b/be/src/kudu/util/memory/arena-test.cc new file mode 100644 index 0000000..695e305 --- /dev/null +++ b/be/src/kudu/util/memory/arena-test.cc @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <cstring> +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/stringprintf.h" +#include "kudu/util/memory/arena.h" +#include "kudu/util/memory/memory.h" +#include "kudu/util/mem_tracker.h" + +DEFINE_int32(num_threads, 16, "Number of threads to test"); +DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread should do"); +DEFINE_int32(alloc_size, 4, "number of bytes in each allocation"); + +namespace kudu { + +using std::shared_ptr; +using std::string; +using std::thread; +using std::vector; + +template<class ArenaType> +static void AllocateThread(ArenaType *arena, uint8_t thread_index) { + std::vector<void *> ptrs; + ptrs.reserve(FLAGS_allocs_per_thread); + + char buf[FLAGS_alloc_size]; + memset(buf, thread_index, FLAGS_alloc_size); + + for (int i = 0; i < FLAGS_allocs_per_thread; i++) { + void *alloced = arena->AllocateBytes(FLAGS_alloc_size); + CHECK(alloced); + memcpy(alloced, buf, FLAGS_alloc_size); + ptrs.push_back(alloced); + } + + for (void *p : ptrs) { + if (memcmp(buf, p, FLAGS_alloc_size) != 0) { + FAIL() << StringPrintf("overwritten pointer at %p", p); + } + } +} + +// Non-templated function to forward to above -- simplifies thread creation +static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t thread_index) { + AllocateThread(arena, thread_index); +} + + +TEST(TestArena, TestSingleThreaded) { + Arena arena(128); + AllocateThread(&arena, 0); +} + + + +TEST(TestArena, TestMultiThreaded) { + CHECK(FLAGS_num_threads < 256); + + ThreadSafeArena arena(1024); + + vector<thread> threads; + for (uint8_t i = 0; i < FLAGS_num_threads; i++) { + threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i); + } + + for (thread& thr : threads) { + thr.join(); + } +} + +TEST(TestArena, TestAlignment) { + ThreadSafeArena arena(1024); + for (int i = 0; i < 1000; i++) { + int alignment = 1 << (1 % 5); + + void *ret = arena.AllocateBytesAligned(5, alignment); + ASSERT_EQ(0, (uintptr_t)(ret) % alignment) << + "failed to align on " << alignment << "b boundary: " << + ret; + } +} + +TEST(TestArena, TestObjectAlignment) { + struct MyStruct { + int64_t v; + }; + Arena a(256); + // Allocate a junk byte to ensure that the next allocation isn't "accidentally" aligned. + a.AllocateBytes(1); + void* v = a.NewObject<MyStruct>(); + ASSERT_EQ(reinterpret_cast<uintptr_t>(v) % alignof(MyStruct), 0); +} + + +// MemTrackers update their ancestors when consuming and releasing memory to compute +// usage totals. However, the lifetimes of parent and child trackers can be different. +// Validate that child trackers can still correctly update their parent stats even when +// the parents go out of scope. +TEST(TestArena, TestMemoryTrackerParentReferences) { + // Set up a parent and child MemTracker. + const string parent_id = "parent-id"; + const string child_id = "child-id"; + shared_ptr<MemTracker> child_tracker; + { + shared_ptr<MemTracker> parent_tracker = MemTracker::CreateTracker(1024, parent_id); + child_tracker = MemTracker::CreateTracker(-1, child_id, parent_tracker); + // Parent falls out of scope here. Should still be owned by the child. + } + shared_ptr<MemoryTrackingBufferAllocator> allocator( + new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), child_tracker)); + MemoryTrackingArena arena(256, allocator); + + // Try some child operations. + ASSERT_EQ(256, child_tracker->consumption()); + void *allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(256, child_tracker->consumption()); + allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(768, child_tracker->consumption()); +} + +TEST(TestArena, TestMemoryTrackingDontEnforce) { + shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, "arena-test-tracker"); + shared_ptr<MemoryTrackingBufferAllocator> allocator( + new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), mem_tracker)); + MemoryTrackingArena arena(256, allocator); + ASSERT_EQ(256, mem_tracker->consumption()); + void *allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(256, mem_tracker->consumption()); + allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(768, mem_tracker->consumption()); + + // In DEBUG mode after Reset() the last component of an arena is + // cleared, but is then created again; in release mode, the last + // component is not cleared. In either case, after Reset() + // consumption() should equal the size of the last component which + // is 512 bytes. + arena.Reset(); + ASSERT_EQ(512, mem_tracker->consumption()); + + // Allocate beyond allowed consumption. This should still go + // through, since enforce_limit is false. + allocated = arena.AllocateBytes(1024); + ASSERT_TRUE(allocated); + + ASSERT_EQ(1536, mem_tracker->consumption()); +} + +TEST(TestArena, TestMemoryTrackingEnforced) { + shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, "arena-test-tracker"); + shared_ptr<MemoryTrackingBufferAllocator> allocator( + new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), mem_tracker, + // enforce limit + true)); + MemoryTrackingArena arena(256, allocator); + ASSERT_EQ(256, mem_tracker->consumption()); + void *allocated = arena.AllocateBytes(256); + ASSERT_TRUE(allocated); + ASSERT_EQ(256, mem_tracker->consumption()); + allocated = arena.AllocateBytes(1024); + ASSERT_FALSE(allocated); + ASSERT_EQ(256, mem_tracker->consumption()); +} + +TEST(TestArena, TestSTLAllocator) { + Arena a(256); + typedef vector<int, ArenaAllocator<int, false> > ArenaVector; + ArenaAllocator<int, false> alloc(&a); + ArenaVector v(alloc); + for (int i = 0; i < 10000; i++) { + v.push_back(i); + } + for (int i = 0; i < 10000; i++) { + ASSERT_EQ(i, v[i]); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/arena.cc b/be/src/kudu/util/memory/arena.cc new file mode 100644 index 0000000..b580dbc --- /dev/null +++ b/be/src/kudu/util/memory/arena.cc @@ -0,0 +1,167 @@ +// Copyright 2010 Google Inc. All Rights Reserved +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +#include "kudu/util/memory/arena.h" + +#include <algorithm> +#include <memory> +#include <mutex> + +using std::min; +using std::unique_ptr; + +namespace kudu { + +template <bool THREADSAFE> +const size_t ArenaBase<THREADSAFE>::kMinimumChunkSize = 16; + +// The max size of our allocations is set to this magic number +// corresponding to 127 tcmalloc pages (each being 8KB). tcmalloc +// internally keeps a free-list of spans up to this size. Larger +// allocations have to go through a linear search through free +// space, which can get quite slow in a fragmented heap. +// +// See the definition of kMaxPages in tcmalloc/src/common.h +// as well as https://github.com/gperftools/gperftools/issues/535 +// for a description of the performance issue. +constexpr int kMaxTcmallocFastAllocation = 8192 * 127; + +template <bool THREADSAFE> +ArenaBase<THREADSAFE>::ArenaBase(BufferAllocator* buffer_allocator, + size_t initial_buffer_size) + : buffer_allocator_(buffer_allocator), + max_buffer_size_(kMaxTcmallocFastAllocation), + arena_footprint_(0) { + AddComponent(CHECK_NOTNULL(NewComponent(initial_buffer_size, 0))); +} + +template <bool THREADSAFE> +ArenaBase<THREADSAFE>::ArenaBase(size_t initial_buffer_size) + : ArenaBase<THREADSAFE>(HeapBufferAllocator::Get(), + initial_buffer_size) { +} + +template <bool THREADSAFE> +void ArenaBase<THREADSAFE>::SetMaxBufferSize(size_t size) { + DCHECK_LE(size, kMaxTcmallocFastAllocation); + max_buffer_size_ = size; +} + +template <bool THREADSAFE> +void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const size_t align) { + std::lock_guard<mutex_type> lock(component_lock_); + + // It's possible another thread raced with us and already allocated + // a new component, in which case we should try the "fast path" again + Component* cur = AcquireLoadCurrent(); + void * result = cur->AllocateBytesAligned(size, align); + if (PREDICT_FALSE(result != nullptr)) return result; + + // Really need to allocate more space. + size_t next_component_size = min(2 * cur->size(), max_buffer_size_); + // But, allocate enough, even if the request is large. In this case, + // might violate the max_element_size bound. + if (next_component_size < size) { + next_component_size = size; + } + // If soft quota is exhausted we will only get the "minimal" amount of memory + // we ask for. In this case if we always use "size" as minimal, we may degrade + // to allocating a lot of tiny components, one for each string added to the + // arena. This would be very inefficient, so let's first try something between + // "size" and "next_component_size". If it fails due to hard quota being + // exhausted, we'll fall back to using "size" as minimal. + size_t minimal = (size + next_component_size) / 2; + CHECK_LE(size, minimal); + CHECK_LE(minimal, next_component_size); + // Now, just make sure we can actually get the memory. + Component* component = NewComponent(next_component_size, minimal); + if (component == nullptr) { + component = NewComponent(next_component_size, size); + } + if (!component) return nullptr; + + // Now, must succeed. The component has at least 'size' bytes. + result = component->AllocateBytesAligned(size, align); + CHECK(result != nullptr); + + // Now add it to the arena. + AddComponent(component); + + return result; +} + +template <bool THREADSAFE> +typename ArenaBase<THREADSAFE>::Component* ArenaBase<THREADSAFE>::NewComponent( + size_t requested_size, + size_t minimum_size) { + Buffer* buffer = buffer_allocator_->BestEffortAllocate(requested_size, + minimum_size); + if (buffer == nullptr) return nullptr; + + CHECK_EQ(reinterpret_cast<uintptr_t>(buffer->data()) & (16 - 1), 0) + << "Components should be 16-byte aligned: " << buffer->data(); + + ASAN_POISON_MEMORY_REGION(buffer->data(), buffer->size()); + + return new Component(buffer); +} + +// LOCKING: component_lock_ must be held by the current thread. +template <bool THREADSAFE> +void ArenaBase<THREADSAFE>::AddComponent(ArenaBase::Component *component) { + ReleaseStoreCurrent(component); + arena_.push_back(unique_ptr<Component>(component)); + arena_footprint_ += component->size(); +} + +template <bool THREADSAFE> +void ArenaBase<THREADSAFE>::Reset() { + std::lock_guard<mutex_type> lock(component_lock_); + + if (PREDICT_FALSE(arena_.size() > 1)) { + unique_ptr<Component> last = std::move(arena_.back()); + arena_.clear(); + arena_.emplace_back(std::move(last)); + ReleaseStoreCurrent(arena_[0].get()); + } + arena_.back()->Reset(); + arena_footprint_ = arena_.back()->size(); + +#ifndef NDEBUG + // In debug mode release the last component too for (hopefully) better + // detection of memory-related bugs (invalid shallow copies, etc.). + size_t last_size = arena_.back()->size(); + arena_.clear(); + AddComponent(CHECK_NOTNULL(NewComponent(last_size, 0))); + arena_footprint_ = 0; +#endif +} + +template <bool THREADSAFE> +size_t ArenaBase<THREADSAFE>::memory_footprint() const { + std::lock_guard<mutex_type> lock(component_lock_); + return arena_footprint_; +} + +// Explicit instantiation. +template class ArenaBase<true>; +template class ArenaBase<false>; + + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/arena.h b/be/src/kudu/util/memory/arena.h new file mode 100644 index 0000000..6d9843b --- /dev/null +++ b/be/src/kudu/util/memory/arena.h @@ -0,0 +1,501 @@ +// Copyright 2010 Google Inc. All Rights Reserved +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// +// Memory arena for variable-length datatypes and STL collections. + +#ifndef KUDU_UTIL_MEMORY_ARENA_H_ +#define KUDU_UTIL_MEMORY_ARENA_H_ + +#include <algorithm> +#include <cstddef> +#include <cstdint> +#include <cstring> +#include <memory> +#include <new> +#include <ostream> +#include <vector> + +#include <boost/signals2/dummy_mutex.hpp> +#include <glog/logging.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/stringpiece.h" +#include "kudu/util/alignment.h" +#include "kudu/util/locks.h" +#include "kudu/util/memory/memory.h" +#include "kudu/util/mutex.h" +#include "kudu/util/slice.h" + +namespace kudu { + +template<bool THREADSAFE> struct ArenaTraits; + +template <> struct ArenaTraits<true> { + typedef Atomic32 offset_type; + typedef Mutex mutex_type; + typedef simple_spinlock spinlock_type; +}; + +template <> struct ArenaTraits<false> { + typedef uint32_t offset_type; + // For non-threadsafe, we don't need any real locking. + typedef boost::signals2::dummy_mutex mutex_type; + typedef boost::signals2::dummy_mutex spinlock_type; +}; + +// A helper class for storing variable-length blobs (e.g. strings). Once a blob +// is added to the arena, its index stays fixed. No reallocation happens. +// Instead, the arena keeps a list of buffers. When it needs to grow, it +// allocates a new buffer. Each subsequent buffer is 2x larger, than its +// predecessor, until the maximum specified buffer size is reached. +// The buffers are furnished by a designated allocator. +// +// This class is thread-safe with the fast path lock-free. +template <bool THREADSAFE> +class ArenaBase { + public: + // Arenas are required to have a minimum size of at least this amount. + static const size_t kMinimumChunkSize; + + // Creates a new arena, with a single buffer of size up-to initial_buffer_size + // and maximum capacity (i.e. total sizes of all buffers) + // possibly limited by the buffer allocator. The allocator might cap the + // initial allocation request arbitrarily (down to zero). As a consequence, + // arena construction never fails due to OOM. + // + // Calls to AllocateBytes() will then give out bytes from the working buffer + // until it is exhausted. Then, a subsequent working buffer will be allocated. + // The size of the next buffer is normally 2x the size of the previous buffer. + // It might be capped by the allocator, or by the max_buffer_size of the Arena, + // settable by SetMaxBufferSize below. + // + // The default maximum buffer size is ~1MB. See 'SetMaxBufferSize' for details + // on when you would want to configure this differently. + ArenaBase(BufferAllocator* buffer_allocator, + size_t initial_buffer_size); + + // Creates an arena using a default (heap) allocator. + explicit ArenaBase(size_t initial_buffer_size); + + // Set the maximum buffer size allocated for this arena. + // The maximum buffer size allowed is slightly less than ~1MB (8192 * 127 bytes). + // + // Consider the following pros/cons of large buffer sizes: + // + // Pros: + // - Fewer heap allocations if the arena will hold a lot of data. + // (hence better allocation performance out of the arena) + // - Better page locality for objects allocated out of the same arena, + // especially if huge pages are in use. + // - Less internal fragmentation at the "end" of each buffer if the + // size of allocations from the arena is close to the size of the + // buffer. For example, with a 128KB max buffer size and 65KB + // allocations, we will only be able to make one allocation from + // each buffer and waste nearly 50% of memory. + // Cons: + // - Larger heap allocations may be more difficult to fulfill if the + // heap is fragmented. + // + // Overall, if you aren't sure, just leave it at the default. + // + // NOTE: this method is not thread-safe, even in the thread-safe variant. + // It is expected to call this only immediately after constructing the + // Arena instance, but before making any allocations. + void SetMaxBufferSize(size_t size); + + // Adds content of the specified Slice to the arena, and returns a + // pointer to it. The pointer is guaranteed to remain valid during the + // lifetime of the arena. The Slice object itself is not copied. The + // size information is not stored. + // (Normal use case is that the caller already has an array of Slices, + // where it keeps these pointers together with size information). + // If this request would make the arena grow and the allocator denies that, + // returns NULL and leaves the arena unchanged. + uint8_t *AddSlice(const Slice& value); + + // Same as above. + void * AddBytes(const void *data, size_t len); + + // Handy wrapper for placement-new. + // + // This ensures that the returned object is properly aligned based on + // alignof(T). + template<class T, typename ... Args> + T *NewObject(Args&&... args); + + // Relocate the given Slice into the arena, setting 'dst' and + // returning true if successful. + // It is legal for 'dst' to be a pointer to 'src'. + // See AddSlice above for detail on memory lifetime. + bool RelocateSlice(const Slice &src, Slice *dst); + + // Similar to the above, but for StringPiece. + bool RelocateStringPiece(const StringPiece& src, StringPiece* sp); + + // Reserves a blob of the specified size in the arena, and returns a pointer + // to it. The caller can then fill the allocated memory. The pointer is + // guaranteed to remain valid during the lifetime of the arena. + // If this request would make the arena grow and the allocator denies that, + // returns NULL and leaves the arena unchanged. + void* AllocateBytes(const size_t size) { + return AllocateBytesAligned(size, 1); + } + + // Allocate bytes, ensuring a specified alignment. + // NOTE: alignment MUST be a power of two, or else this will break. + void* AllocateBytesAligned(const size_t size, const size_t alignment); + + // Removes all data from the arena. (Invalidates all pointers returned by + // AddSlice and AllocateBytes). Does not cause memory allocation. + // May reduce memory footprint, as it discards all allocated buffers but + // the last one. + // Unless allocations exceed max_buffer_size, repetitive filling up and + // resetting normally lead to quickly settling memory footprint and ceasing + // buffer allocations, as the arena keeps reusing a single, large buffer. + void Reset(); + + // Returns the memory footprint of this arena, in bytes, defined as a sum of + // all buffer sizes. Always greater or equal to the total number of + // bytes allocated out of the arena. + size_t memory_footprint() const; + + private: + typedef typename ArenaTraits<THREADSAFE>::mutex_type mutex_type; + // Encapsulates a single buffer in the arena. + class Component; + + // Fallback for AllocateBytes non-fast-path + void* AllocateBytesFallback(const size_t size, const size_t align); + + Component* NewComponent(size_t requested_size, size_t minimum_size); + void AddComponent(Component *component); + + // Load the current component, with "Acquire" semantics (see atomicops.h) + // if the arena is meant to be thread-safe. + inline Component* AcquireLoadCurrent() { + if (THREADSAFE) { + return reinterpret_cast<Component*>( + base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(¤t_))); + } else { + return current_; + } + } + + // Store the current component, with "Release" semantics (see atomicops.h) + // if the arena is meant to be thread-safe. + inline void ReleaseStoreCurrent(Component* c) { + if (THREADSAFE) { + base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(¤t_), + reinterpret_cast<AtomicWord>(c)); + } else { + current_ = c; + } + } + + BufferAllocator* const buffer_allocator_; + std::vector<std::unique_ptr<Component> > arena_; + + // The current component to allocate from. + // Use AcquireLoadCurrent and ReleaseStoreCurrent to load/store. + Component* current_; + size_t max_buffer_size_; + size_t arena_footprint_; + + // Lock covering 'slow path' allocation, when new components are + // allocated and added to the arena's list. Also covers any other + // mutation of the component data structure (eg Reset). + mutable mutex_type component_lock_; + + DISALLOW_COPY_AND_ASSIGN(ArenaBase); +}; + +// STL-compliant allocator, for use with hash_maps and other structures +// which share lifetime with an Arena. Enables memory control and improves +// performance. +template<class T, bool THREADSAFE> class ArenaAllocator { + public: + typedef T value_type; + typedef size_t size_type; + typedef ptrdiff_t difference_type; + + typedef T* pointer; + typedef const T* const_pointer; + typedef T& reference; + typedef const T& const_reference; + pointer index(reference r) const { return &r; } + const_pointer index(const_reference r) const { return &r; } + size_type max_size() const { return size_t(-1) / sizeof(T); } + + explicit ArenaAllocator(ArenaBase<THREADSAFE>* arena) : arena_(arena) { + CHECK_NOTNULL(arena_); + } + + ~ArenaAllocator() { } + + pointer allocate(size_type n, std::allocator<void>::const_pointer /*hint*/ = 0) { + return reinterpret_cast<T*>(arena_->AllocateBytes(n * sizeof(T))); + } + + void deallocate(pointer p, size_type n) {} + + void construct(pointer p, const T& val) { + new(reinterpret_cast<void*>(p)) T(val); + } + + void destroy(pointer p) { p->~T(); } + + template<class U> struct rebind { + typedef ArenaAllocator<U, THREADSAFE> other; + }; + + template<class U, bool TS> ArenaAllocator(const ArenaAllocator<U, TS>& other) + : arena_(other.arena()) { } + + template<class U, bool TS> bool operator==(const ArenaAllocator<U, TS>& other) const { + return arena_ == other.arena(); + } + + template<class U, bool TS> bool operator!=(const ArenaAllocator<U, TS>& other) const { + return arena_ != other.arena(); + } + + ArenaBase<THREADSAFE> *arena() const { + return arena_; + } + + private: + + ArenaBase<THREADSAFE>* arena_; +}; + + +class Arena : public ArenaBase<false> { + public: + explicit Arena(size_t initial_buffer_size) : + ArenaBase<false>(initial_buffer_size) + {} +}; + +class ThreadSafeArena : public ArenaBase<true> { + public: + explicit ThreadSafeArena(size_t initial_buffer_size) : + ArenaBase<true>(initial_buffer_size) + {} +}; + +// Arena implementation that is integrated with MemTracker in order to +// track heap-allocated space consumed by the arena. + +class MemoryTrackingArena : public ArenaBase<false> { + public: + + MemoryTrackingArena( + size_t initial_buffer_size, + const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator) + : ArenaBase<false>(tracking_allocator.get(), initial_buffer_size), + tracking_allocator_(tracking_allocator) {} + + ~MemoryTrackingArena() { + } + + private: + + // This is required in order for the Arena to survive even after tablet is shut down, + // e.g., in the case of Scanners running scanners (see tablet_server-test.cc) + std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_; +}; + +class ThreadSafeMemoryTrackingArena : public ArenaBase<true> { + public: + + ThreadSafeMemoryTrackingArena( + size_t initial_buffer_size, + const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator) + : ArenaBase<true>(tracking_allocator.get(), initial_buffer_size), + tracking_allocator_(tracking_allocator) {} + + ~ThreadSafeMemoryTrackingArena() { + } + + private: + + // See comment in MemoryTrackingArena above. + std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_; +}; + +// Implementation of inline and template methods + +template<bool THREADSAFE> +class ArenaBase<THREADSAFE>::Component { + public: + explicit Component(Buffer* buffer) + : buffer_(buffer), + data_(static_cast<uint8_t*>(buffer->data())), + offset_(0), + size_(buffer->size()) {} + + // Tries to reserve space in this component. Returns the pointer to the + // reserved space if successful; NULL on failure (if there's no more room). + uint8_t* AllocateBytes(const size_t size) { + return AllocateBytesAligned(size, 1); + } + + uint8_t *AllocateBytesAligned(const size_t size, const size_t alignment); + + size_t size() const { return size_; } + void Reset() { + ASAN_POISON_MEMORY_REGION(data_, size_); + offset_ = 0; + } + + private: + // Mark the given range unpoisoned in ASAN. + // This is a no-op in a non-ASAN build. + void AsanUnpoison(const void* addr, size_t size); + + gscoped_ptr<Buffer> buffer_; + uint8_t* const data_; + typename ArenaTraits<THREADSAFE>::offset_type offset_; + const size_t size_; + +#ifdef ADDRESS_SANITIZER + // Lock used around unpoisoning memory when ASAN is enabled. + // ASAN does not support concurrent unpoison calls that may overlap a particular + // memory word (8 bytes). + typedef typename ArenaTraits<THREADSAFE>::spinlock_type spinlock_type; + spinlock_type asan_lock_; +#endif + DISALLOW_COPY_AND_ASSIGN(Component); +}; + + +// Thread-safe implementation +template <> +inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned( + const size_t size, const size_t alignment) { + // Special case check the allowed alignments. Currently, we only ensure + // the allocated buffer components are 16-byte aligned, and the code path + // doesn't support larger alignment. + DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || + alignment == 8 || alignment == 16) + << "bad alignment: " << alignment; + retry: + Atomic32 offset = Acquire_Load(&offset_); + + Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment); + Atomic32 new_offset = aligned + size; + + if (PREDICT_TRUE(new_offset <= size_)) { + bool success = Acquire_CompareAndSwap(&offset_, offset, new_offset) == offset; + if (PREDICT_TRUE(success)) { + AsanUnpoison(data_ + aligned, size); + return data_ + aligned; + } else { + // Raced with another allocator + goto retry; + } + } else { + return NULL; + } +} + +// Non-Threadsafe implementation +template <> +inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned( + const size_t size, const size_t alignment) { + DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || + alignment == 8 || alignment == 16) + << "bad alignment: " << alignment; + size_t aligned = KUDU_ALIGN_UP(offset_, alignment); + uint8_t* destination = data_ + aligned; + size_t save_offset = offset_; + offset_ = aligned + size; + if (PREDICT_TRUE(offset_ <= size_)) { + AsanUnpoison(data_ + aligned, size); + return destination; + } else { + offset_ = save_offset; + return NULL; + } +} + +template <bool THREADSAFE> +inline void ArenaBase<THREADSAFE>::Component::AsanUnpoison(const void* addr, size_t size) { +#ifdef ADDRESS_SANITIZER + std::lock_guard<spinlock_type> l(asan_lock_); + ASAN_UNPOISON_MEMORY_REGION(addr, size); +#endif +} + +// Fast-path allocation should get inlined, and fall-back +// to non-inline function call for allocation failure +template <bool THREADSAFE> +inline void *ArenaBase<THREADSAFE>::AllocateBytesAligned(const size_t size, const size_t align) { + void* result = AcquireLoadCurrent()->AllocateBytesAligned(size, align); + if (PREDICT_TRUE(result != NULL)) return result; + return AllocateBytesFallback(size, align); +} + +template <bool THREADSAFE> +inline uint8_t* ArenaBase<THREADSAFE>::AddSlice(const Slice& value) { + return reinterpret_cast<uint8_t *>(AddBytes(value.data(), value.size())); +} + +template <bool THREADSAFE> +inline void *ArenaBase<THREADSAFE>::AddBytes(const void *data, size_t len) { + void* destination = AllocateBytes(len); + if (destination == NULL) return NULL; + memcpy(destination, data, len); + return destination; +} + +template <bool THREADSAFE> +inline bool ArenaBase<THREADSAFE>::RelocateSlice(const Slice &src, Slice *dst) { + void* destination = AllocateBytes(src.size()); + if (destination == NULL) return false; + memcpy(destination, src.data(), src.size()); + *dst = Slice(reinterpret_cast<uint8_t *>(destination), src.size()); + return true; +} + + +template <bool THREADSAFE> +inline bool ArenaBase<THREADSAFE>::RelocateStringPiece(const StringPiece& src, StringPiece* sp) { + Slice slice(src.data(), src.size()); + if (!RelocateSlice(slice, &slice)) return false; + *sp = StringPiece(reinterpret_cast<const char*>(slice.data()), slice.size()); + return true; +} + +template<bool THREADSAFE> +template<class T, class ... Args> +inline T *ArenaBase<THREADSAFE>::NewObject(Args&&... args) { + void *mem = AllocateBytesAligned(sizeof(T), alignof(T)); + if (mem == NULL) throw std::bad_alloc(); + return new (mem) T(std::forward<Args>(args)...); +} + +} // namespace kudu + +#endif // KUDU_UTIL_MEMORY_ARENA_H_ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/memory.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/memory.cc b/be/src/kudu/util/memory/memory.cc new file mode 100644 index 0000000..b3964df --- /dev/null +++ b/be/src/kudu/util/memory/memory.cc @@ -0,0 +1,339 @@ +// Copyright 2010 Google Inc. All Rights Reserved +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "kudu/util/memory/memory.h" + +#include <mm_malloc.h> + +#include <algorithm> +#include <cstdlib> +#include <cstring> + +#include <gflags/gflags.h> + +#include "kudu/util/alignment.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/memory/overwrite.h" +#include "kudu/util/mem_tracker.h" + +using std::copy; +using std::min; + +// TODO(onufry) - test whether the code still tests OK if we set this to true, +// or remove this code and add a test that Google allocator does not change it's +// contract - 16-aligned in -c opt and %16 == 8 in debug. +DEFINE_bool(allocator_aligned_mode, false, + "Use 16-byte alignment instead of 8-byte, " + "unless explicitly specified otherwise - to boost SIMD"); +TAG_FLAG(allocator_aligned_mode, hidden); + +namespace kudu { + +namespace { +static char dummy_buffer[0] = {}; +} + +Buffer::~Buffer() { +#if !defined(NDEBUG) && !defined(ADDRESS_SANITIZER) + // "unrolling" the string "BAD" makes for a much more efficient + // OverwriteWithPattern call in debug mode, so we can keep this + // useful bit of code without tests going slower! + // + // In ASAN mode, we don't bother with this, because when we free the memory, ASAN will + // prevent us from accessing it anyway. + OverwriteWithPattern(reinterpret_cast<char*>(data_), size_, + "BADBADBADBADBADBADBADBADBADBADBAD" + "BADBADBADBADBADBADBADBADBADBADBAD" + "BADBADBADBADBADBADBADBADBADBADBAD"); +#endif + if (allocator_ != nullptr) allocator_->FreeInternal(this); +} + +void BufferAllocator::LogAllocation(size_t requested, + size_t minimal, + Buffer* buffer) { + if (buffer == nullptr) { + LOG(WARNING) << "Memory allocation failed. " + << "Number of bytes requested: " << requested + << ", minimal: " << minimal; + return; + } + if (buffer->size() < requested) { + LOG(WARNING) << "Memory allocation was shorter than requested. " + << "Number of bytes requested to allocate: " << requested + << ", minimal: " << minimal + << ", and actually allocated: " << buffer->size(); + } +} + +HeapBufferAllocator::HeapBufferAllocator() + : aligned_mode_(FLAGS_allocator_aligned_mode) { +} + +Buffer* HeapBufferAllocator::AllocateInternal( + const size_t requested, + const size_t minimal, + BufferAllocator* const originator) { + DCHECK_LE(minimal, requested); + void* data; + size_t attempted = requested; + while (true) { + data = (attempted == 0) ? &dummy_buffer[0] : Malloc(attempted); + if (data != nullptr) { + return CreateBuffer(data, attempted, originator); + } + if (attempted == minimal) return nullptr; + attempted = minimal + (attempted - minimal - 1) / 2; + } +} + +bool HeapBufferAllocator::ReallocateInternal( + const size_t requested, + const size_t minimal, + Buffer* const buffer, + BufferAllocator* const originator) { + DCHECK_LE(minimal, requested); + void* data; + size_t attempted = requested; + while (true) { + if (attempted == 0) { + if (buffer->size() > 0) free(buffer->data()); + data = &dummy_buffer[0]; + } else { + if (buffer->size() > 0) { + data = Realloc(buffer->data(), buffer->size(), attempted); + } else { + data = Malloc(attempted); + } + } + if (data != nullptr) { + UpdateBuffer(data, attempted, buffer); + return true; + } + if (attempted == minimal) return false; + attempted = minimal + (attempted - minimal - 1) / 2; + } +} + +void HeapBufferAllocator::FreeInternal(Buffer* buffer) { + if (buffer->size() > 0) free(buffer->data()); +} + +void* HeapBufferAllocator::Malloc(size_t size) { + if (aligned_mode_) { + void* data; + if (posix_memalign(&data, 16, KUDU_ALIGN_UP(size, 16))) { + return nullptr; + } + return data; + } else { + return malloc(size); + } +} + +void* HeapBufferAllocator::Realloc(void* previous_data, size_t previous_size, + size_t new_size) { + if (aligned_mode_) { + void* data = Malloc(new_size); + if (data) { +// NOTE(ptab): We should use realloc here to avoid memmory coping, +// but it doesn't work on memory allocated by posix_memalign(...). +// realloc reallocates the memory but doesn't preserve the content. +// TODO(ptab): reiterate after some time to check if it is fixed (tcmalloc ?) + memcpy(data, previous_data, min(previous_size, new_size)); + free(previous_data); + return data; + } else { + return nullptr; + } + } else { + return realloc(previous_data, new_size); + } +} + +Buffer* ClearingBufferAllocator::AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) { + Buffer* buffer = DelegateAllocate(delegate_, requested, minimal, + originator); + if (buffer != nullptr) memset(buffer->data(), 0, buffer->size()); + return buffer; +} + +bool ClearingBufferAllocator::ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) { + size_t offset = (buffer != nullptr ? buffer->size() : 0); + bool success = DelegateReallocate(delegate_, requested, minimal, buffer, + originator); + if (success && buffer->size() > offset) { + memset(static_cast<char*>(buffer->data()) + offset, 0, + buffer->size() - offset); + } + return success; +} + +void ClearingBufferAllocator::FreeInternal(Buffer* buffer) { + DelegateFree(delegate_, buffer); +} + +Buffer* MediatingBufferAllocator::AllocateInternal( + const size_t requested, + const size_t minimal, + BufferAllocator* const originator) { + // Allow the mediator to trim the request. + size_t granted; + if (requested > 0) { + granted = mediator_->Allocate(requested, minimal); + if (granted < minimal) return nullptr; + } else { + granted = 0; + } + Buffer* buffer = DelegateAllocate(delegate_, granted, minimal, originator); + if (buffer == nullptr) { + mediator_->Free(granted); + } else if (buffer->size() < granted) { + mediator_->Free(granted - buffer->size()); + } + return buffer; +} + +bool MediatingBufferAllocator::ReallocateInternal( + const size_t requested, + const size_t minimal, + Buffer* const buffer, + BufferAllocator* const originator) { + // Allow the mediator to trim the request. Be conservative; assume that + // realloc may degenerate to malloc-memcpy-free. + size_t granted; + if (requested > 0) { + granted = mediator_->Allocate(requested, minimal); + if (granted < minimal) return false; + } else { + granted = 0; + } + size_t old_size = buffer->size(); + if (DelegateReallocate(delegate_, granted, minimal, buffer, originator)) { + mediator_->Free(granted - buffer->size() + old_size); + return true; + } else { + mediator_->Free(granted); + return false; + } +} + +void MediatingBufferAllocator::FreeInternal(Buffer* buffer) { + mediator_->Free(buffer->size()); + DelegateFree(delegate_, buffer); +} + +Buffer* MemoryStatisticsCollectingBufferAllocator::AllocateInternal( + const size_t requested, + const size_t minimal, + BufferAllocator* const originator) { + Buffer* buffer = DelegateAllocate(delegate_, requested, minimal, originator); + if (buffer != nullptr) { + memory_stats_collector_->AllocatedMemoryBytes(buffer->size()); + } else { + memory_stats_collector_->RefusedMemoryBytes(minimal); + } + return buffer; +} + +bool MemoryStatisticsCollectingBufferAllocator::ReallocateInternal( + const size_t requested, + const size_t minimal, + Buffer* const buffer, + BufferAllocator* const originator) { + const size_t old_size = buffer->size(); + bool outcome = DelegateReallocate(delegate_, requested, minimal, buffer, + originator); + if (buffer->size() > old_size) { + memory_stats_collector_->AllocatedMemoryBytes(buffer->size() - old_size); + } else if (buffer->size() < old_size) { + memory_stats_collector_->FreedMemoryBytes(old_size - buffer->size()); + } else if (!outcome && (minimal > buffer->size())) { + memory_stats_collector_->RefusedMemoryBytes(minimal - buffer->size()); + } + return outcome; +} + +void MemoryStatisticsCollectingBufferAllocator::FreeInternal(Buffer* buffer) { + DelegateFree(delegate_, buffer); + memory_stats_collector_->FreedMemoryBytes(buffer->size()); +} + +size_t MemoryTrackingBufferAllocator::Available() const { + return enforce_limit_ ? mem_tracker_->SpareCapacity() : std::numeric_limits<int64_t>::max(); +} + +bool MemoryTrackingBufferAllocator::TryConsume(int64_t bytes) { + // Calls TryConsume first, even if enforce_limit_ is false: this + // will cause mem_tracker_ to try to free up more memory by GCing. + if (!mem_tracker_->TryConsume(bytes)) { + if (enforce_limit_) { + return false; + } else { + // If enforce_limit_ is false, allocate memory anyway. + mem_tracker_->Consume(bytes); + } + } + return true; +} + +Buffer* MemoryTrackingBufferAllocator::AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) { + if (TryConsume(requested)) { + Buffer* buffer = DelegateAllocate(delegate_, requested, requested, originator); + if (buffer == nullptr) { + mem_tracker_->Release(requested); + } else { + return buffer; + } + } + + if (TryConsume(minimal)) { + Buffer* buffer = DelegateAllocate(delegate_, minimal, minimal, originator); + if (buffer == nullptr) { + mem_tracker_->Release(minimal); + } + return buffer; + } + + return nullptr; +} + + +bool MemoryTrackingBufferAllocator::ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) { + LOG(FATAL) << "Not implemented"; + return false; +} + +void MemoryTrackingBufferAllocator::FreeInternal(Buffer* buffer) { + DelegateFree(delegate_, buffer); + mem_tracker_->Release(buffer->size()); +} + +} // namespace kudu