http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memcmpable_varint.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint.h 
b/be/src/kudu/util/memcmpable_varint.h
new file mode 100644
index 0000000..c1ce071
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This is an alternate varint format, borrowed from sqlite4, that differs 
from the
+// varint in util/coding.h in that its serialized form can be compared with 
memcmp(),
+// yielding the same result as comparing the original integers.
+//
+// The serialized form also has the property that multiple such varints can be 
strung
+// together to form a composite key, which itself is memcmpable.
+//
+// See memcmpable_varint.cc for further description.
+
+#ifndef KUDU_UTIL_MEMCMPABLE_VARINT_H
+#define KUDU_UTIL_MEMCMPABLE_VARINT_H
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+void PutMemcmpableVarint64(faststring *dst, uint64_t value);
+
+// Standard Get... routines parse a value from the beginning of a Slice
+// and advance the slice past the parsed value.
+bool GetMemcmpableVarint64(Slice *input, uint64_t *value);
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/arena-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena-test.cc 
b/be/src/kudu/util/memory/arena-test.cc
new file mode 100644
index 0000000..fc3a64d
--- /dev/null
+++ b/be/src/kudu/util/memory/arena-test.cc
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <thread>
+#include <vector>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/memory/memory.h"
+#include "kudu/util/mem_tracker.h"
+
+DEFINE_int32(num_threads, 16, "Number of threads to test");
+DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread 
should do");
+DEFINE_int32(alloc_size, 4, "number of bytes in each allocation");
+
+namespace kudu {
+
+using std::shared_ptr;
+using std::thread;
+using std::vector;
+
+template<class ArenaType>
+static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
+  std::vector<void *> ptrs;
+  ptrs.reserve(FLAGS_allocs_per_thread);
+
+  char buf[FLAGS_alloc_size];
+  memset(buf, thread_index, FLAGS_alloc_size);
+
+  for (int i = 0; i < FLAGS_allocs_per_thread; i++) {
+    void *alloced = arena->AllocateBytes(FLAGS_alloc_size);
+    CHECK(alloced);
+    memcpy(alloced, buf, FLAGS_alloc_size);
+    ptrs.push_back(alloced);
+  }
+
+  for (void *p : ptrs) {
+    if (memcmp(buf, p, FLAGS_alloc_size) != 0) {
+      FAIL() << StringPrintf("overwritten pointer at %p", p);
+    }
+  }
+}
+
+// Non-templated function to forward to above -- simplifies thread creation
+static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t 
thread_index) {
+  AllocateThread(arena, thread_index);
+}
+
+
+TEST(TestArena, TestSingleThreaded) {
+  Arena arena(128, 128);
+
+  AllocateThread(&arena, 0);
+}
+
+
+
+TEST(TestArena, TestMultiThreaded) {
+  CHECK(FLAGS_num_threads < 256);
+
+  ThreadSafeArena arena(1024, 1024);
+
+  vector<thread> threads;
+  for (uint8_t i = 0; i < FLAGS_num_threads; i++) {
+    threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i);
+  }
+
+  for (thread& thr : threads) {
+    thr.join();
+  }
+}
+
+TEST(TestArena, TestAlignment) {
+
+  ThreadSafeArena arena(1024, 1024);
+  for (int i = 0; i < 1000; i++) {
+    int alignment = 1 << (1 % 5);
+
+    void *ret = arena.AllocateBytesAligned(5, alignment);
+    ASSERT_EQ(0, (uintptr_t)(ret) % alignment) <<
+      "failed to align on " << alignment << "b boundary: " <<
+      ret;
+  }
+}
+
+TEST(TestArena, TestObjectAlignment) {
+  struct MyStruct {
+    int64_t v;
+  };
+  Arena a(256, 256 * 1024);
+  // Allocate a junk byte to ensure that the next allocation isn't 
"accidentally" aligned.
+  a.AllocateBytes(1);
+  void* v = a.NewObject<MyStruct>();
+  ASSERT_EQ(reinterpret_cast<uintptr_t>(v) % alignof(MyStruct), 0);
+}
+
+
+// MemTrackers update their ancestors when consuming and releasing memory to 
compute
+// usage totals. However, the lifetimes of parent and child trackers can be 
different.
+// Validate that child trackers can still correctly update their parent stats 
even when
+// the parents go out of scope.
+TEST(TestArena, TestMemoryTrackerParentReferences) {
+  // Set up a parent and child MemTracker.
+  const string parent_id = "parent-id";
+  const string child_id = "child-id";
+  shared_ptr<MemTracker> child_tracker;
+  {
+    shared_ptr<MemTracker> parent_tracker = MemTracker::CreateTracker(1024, 
parent_id);
+    child_tracker = MemTracker::CreateTracker(-1, child_id, parent_tracker);
+    // Parent falls out of scope here. Should still be owned by the child.
+  }
+  shared_ptr<MemoryTrackingBufferAllocator> allocator(
+      new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), 
child_tracker));
+  MemoryTrackingArena arena(256, 1024, allocator);
+
+  // Try some child operations.
+  ASSERT_EQ(256, child_tracker->consumption());
+  void *allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(256, child_tracker->consumption());
+  allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(768, child_tracker->consumption());
+}
+
+TEST(TestArena, TestMemoryTrackingDontEnforce) {
+  shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, 
"arena-test-tracker");
+  shared_ptr<MemoryTrackingBufferAllocator> allocator(
+      new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), 
mem_tracker));
+  MemoryTrackingArena arena(256, 1024, allocator);
+  ASSERT_EQ(256, mem_tracker->consumption());
+  void *allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(256, mem_tracker->consumption());
+  allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(768, mem_tracker->consumption());
+
+  // In DEBUG mode after Reset() the last component of an arena is
+  // cleared, but is then created again; in release mode, the last
+  // component is not cleared. In either case, after Reset()
+  // consumption() should equal the size of the last component which
+  // is 512 bytes.
+  arena.Reset();
+  ASSERT_EQ(512, mem_tracker->consumption());
+
+  // Allocate beyond allowed consumption. This should still go
+  // through, since enforce_limit is false.
+  allocated = arena.AllocateBytes(1024);
+  ASSERT_TRUE(allocated);
+
+  ASSERT_EQ(1536, mem_tracker->consumption());
+}
+
+TEST(TestArena, TestMemoryTrackingEnforced) {
+  shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, 
"arena-test-tracker");
+  shared_ptr<MemoryTrackingBufferAllocator> allocator(
+      new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), 
mem_tracker,
+                                        // enforce limit
+                                        true));
+  MemoryTrackingArena arena(256, 1024, allocator);
+  ASSERT_EQ(256, mem_tracker->consumption());
+  void *allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(256, mem_tracker->consumption());
+  allocated = arena.AllocateBytes(1024);
+  ASSERT_FALSE(allocated);
+  ASSERT_EQ(256, mem_tracker->consumption());
+}
+
+TEST(TestArena, TestSTLAllocator) {
+  Arena a(256, 256 * 1024);
+  typedef vector<int, ArenaAllocator<int, false> > ArenaVector;
+  ArenaAllocator<int, false> alloc(&a);
+  ArenaVector v(alloc);
+  for (int i = 0; i < 10000; i++) {
+    v.push_back(i);
+  }
+  for (int i = 0; i < 10000; i++) {
+    ASSERT_EQ(i, v[i]);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/arena.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena.cc b/be/src/kudu/util/memory/arena.cc
new file mode 100644
index 0000000..427072f
--- /dev/null
+++ b/be/src/kudu/util/memory/arena.cc
@@ -0,0 +1,161 @@
+// Copyright 2010 Google Inc.  All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+#include "kudu/util/memory/arena.h"
+
+#include <algorithm>
+#include <mutex>
+
+#include "kudu/util/debug-util.h"
+#include "kudu/util/flag_tags.h"
+
+using std::copy;
+using std::max;
+using std::min;
+using std::reverse;
+using std::sort;
+using std::swap;
+using std::unique_ptr;
+
+namespace kudu {
+
+template <bool THREADSAFE>
+const size_t ArenaBase<THREADSAFE>::kMinimumChunkSize = 16;
+
+template <bool THREADSAFE>
+ArenaBase<THREADSAFE>::ArenaBase(
+  BufferAllocator* const buffer_allocator,
+  size_t initial_buffer_size,
+  size_t max_buffer_size)
+    : buffer_allocator_(buffer_allocator),
+      max_buffer_size_(max_buffer_size),
+      arena_footprint_(0) {
+  AddComponent(CHECK_NOTNULL(NewComponent(initial_buffer_size, 0)));
+}
+
+template <bool THREADSAFE>
+ArenaBase<THREADSAFE>::ArenaBase(size_t initial_buffer_size, size_t 
max_buffer_size)
+    : buffer_allocator_(HeapBufferAllocator::Get()),
+      max_buffer_size_(max_buffer_size),
+      arena_footprint_(0) {
+  AddComponent(CHECK_NOTNULL(NewComponent(initial_buffer_size, 0)));
+}
+
+template <bool THREADSAFE>
+void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const 
size_t align) {
+  std::lock_guard<mutex_type> lock(component_lock_);
+
+  // It's possible another thread raced with us and already allocated
+  // a new component, in which case we should try the "fast path" again
+  Component* cur = AcquireLoadCurrent();
+  void * result = cur->AllocateBytesAligned(size, align);
+  if (PREDICT_FALSE(result != nullptr)) return result;
+
+  // Really need to allocate more space.
+  size_t next_component_size = min(2 * cur->size(), max_buffer_size_);
+  // But, allocate enough, even if the request is large. In this case,
+  // might violate the max_element_size bound.
+  if (next_component_size < size) {
+    next_component_size = size;
+  }
+  // If soft quota is exhausted we will only get the "minimal" amount of memory
+  // we ask for. In this case if we always use "size" as minimal, we may 
degrade
+  // to allocating a lot of tiny components, one for each string added to the
+  // arena. This would be very inefficient, so let's first try something 
between
+  // "size" and "next_component_size". If it fails due to hard quota being
+  // exhausted, we'll fall back to using "size" as minimal.
+  size_t minimal = (size + next_component_size) / 2;
+  CHECK_LE(size, minimal);
+  CHECK_LE(minimal, next_component_size);
+  // Now, just make sure we can actually get the memory.
+  Component* component = NewComponent(next_component_size, minimal);
+  if (component == nullptr) {
+    component = NewComponent(next_component_size, size);
+  }
+  if (!component) return nullptr;
+
+  // Now, must succeed. The component has at least 'size' bytes.
+  result = component->AllocateBytesAligned(size, align);
+  CHECK(result != nullptr);
+
+  // Now add it to the arena.
+  AddComponent(component);
+
+  return result;
+}
+
+template <bool THREADSAFE>
+typename ArenaBase<THREADSAFE>::Component* ArenaBase<THREADSAFE>::NewComponent(
+  size_t requested_size,
+  size_t minimum_size) {
+  Buffer* buffer = buffer_allocator_->BestEffortAllocate(requested_size,
+                                                         minimum_size);
+  if (buffer == nullptr) return nullptr;
+
+  CHECK_EQ(reinterpret_cast<uintptr_t>(buffer->data()) & (16 - 1), 0)
+    << "Components should be 16-byte aligned: " << buffer->data();
+
+  ASAN_POISON_MEMORY_REGION(buffer->data(), buffer->size());
+
+  return new Component(buffer);
+}
+
+// LOCKING: component_lock_ must be held by the current thread.
+template <bool THREADSAFE>
+void ArenaBase<THREADSAFE>::AddComponent(ArenaBase::Component *component) {
+  ReleaseStoreCurrent(component);
+  arena_.push_back(unique_ptr<Component>(component));
+  arena_footprint_ += component->size();
+}
+
+template <bool THREADSAFE>
+void ArenaBase<THREADSAFE>::Reset() {
+  std::lock_guard<mutex_type> lock(component_lock_);
+
+  if (PREDICT_FALSE(arena_.size() > 1)) {
+    unique_ptr<Component> last = std::move(arena_.back());
+    arena_.clear();
+    arena_.emplace_back(std::move(last));
+    ReleaseStoreCurrent(arena_[0].get());
+  }
+  arena_.back()->Reset();
+  arena_footprint_ = arena_.back()->size();
+
+#ifndef NDEBUG
+  // In debug mode release the last component too for (hopefully) better
+  // detection of memory-related bugs (invalid shallow copies, etc.).
+  size_t last_size = arena_.back()->size();
+  arena_.clear();
+  AddComponent(CHECK_NOTNULL(NewComponent(last_size, 0)));
+  arena_footprint_ = 0;
+#endif
+}
+
+template <bool THREADSAFE>
+size_t ArenaBase<THREADSAFE>::memory_footprint() const {
+  std::lock_guard<mutex_type> lock(component_lock_);
+  return arena_footprint_;
+}
+
+// Explicit instantiation.
+template class ArenaBase<true>;
+template class ArenaBase<false>;
+
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/arena.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena.h b/be/src/kudu/util/memory/arena.h
new file mode 100644
index 0000000..1c98564
--- /dev/null
+++ b/be/src/kudu/util/memory/arena.h
@@ -0,0 +1,473 @@
+// Copyright 2010 Google Inc.  All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+//
+// Memory arena for variable-length datatypes and STL collections.
+
+#ifndef KUDU_UTIL_MEMORY_ARENA_H_
+#define KUDU_UTIL_MEMORY_ARENA_H_
+
+#include <boost/signals2/dummy_mutex.hpp>
+#include <glog/logging.h>
+#include <memory>
+#include <mutex>
+#include <new>
+#include <stddef.h>
+#include <string.h>
+#include <vector>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/logging-inl.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/memory/memory.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/slice.h"
+
+using std::allocator;
+
+namespace kudu {
+
+template<bool THREADSAFE> struct ArenaTraits;
+
+template <> struct ArenaTraits<true> {
+  typedef Atomic32 offset_type;
+  typedef Mutex mutex_type;
+  typedef simple_spinlock spinlock_type;
+};
+
+template <> struct ArenaTraits<false> {
+  typedef uint32_t offset_type;
+  // For non-threadsafe, we don't need any real locking.
+  typedef boost::signals2::dummy_mutex mutex_type;
+  typedef boost::signals2::dummy_mutex spinlock_type;
+};
+
+// A helper class for storing variable-length blobs (e.g. strings). Once a blob
+// is added to the arena, its index stays fixed. No reallocation happens.
+// Instead, the arena keeps a list of buffers. When it needs to grow, it
+// allocates a new buffer. Each subsequent buffer is 2x larger, than its
+// predecessor, until the maximum specified buffer size is reached.
+// The buffers are furnished by a designated allocator.
+//
+// This class is thread-safe with the fast path lock-free.
+template <bool THREADSAFE>
+class ArenaBase {
+ public:
+  // Arenas are required to have a minimum size of at least this amount.
+  static const size_t kMinimumChunkSize;
+
+  // Creates a new arena, with a single buffer of size up-to
+  // initial_buffer_size, upper size limit for later-allocated buffers capped
+  // at max_buffer_size, and maximum capacity (i.e. total sizes of all buffers)
+  // possibly limited by the buffer allocator. The allocator might cap the
+  // initial allocation request arbitrarily (down to zero). As a consequence,
+  // arena construction never fails due to OOM.
+  //
+  // Calls to AllocateBytes() will then give out bytes from the working buffer
+  // until it is exhausted. Then, a subsequent working buffer will be 
allocated.
+  // The size of the next buffer is normally 2x the size of the previous 
buffer.
+  // It might be capped by the allocator, or by the max_buffer_size parameter.
+  ArenaBase(BufferAllocator* const buffer_allocator,
+            size_t initial_buffer_size,
+            size_t max_buffer_size);
+
+  // Creates an arena using a default (heap) allocator with unbounded capacity.
+  // Discretion advised.
+  ArenaBase(size_t initial_buffer_size, size_t max_buffer_size);
+
+  // Adds content of the specified Slice to the arena, and returns a
+  // pointer to it. The pointer is guaranteed to remain valid during the
+  // lifetime of the arena. The Slice object itself is not copied. The
+  // size information is not stored.
+  // (Normal use case is that the caller already has an array of Slices,
+  // where it keeps these pointers together with size information).
+  // If this request would make the arena grow and the allocator denies that,
+  // returns NULL and leaves the arena unchanged.
+  uint8_t *AddSlice(const Slice& value);
+
+  // Same as above.
+  void * AddBytes(const void *data, size_t len);
+
+  // Handy wrapper for placement-new.
+  //
+  // This ensures that the returned object is properly aligned based on
+  // alignof(T).
+  template<class T, typename ... Args>
+  T *NewObject(Args&&... args);
+
+  // Relocate the given Slice into the arena, setting 'dst' and
+  // returning true if successful.
+  // It is legal for 'dst' to be a pointer to 'src'.
+  // See AddSlice above for detail on memory lifetime.
+  bool RelocateSlice(const Slice &src, Slice *dst);
+
+  // Similar to the above, but for StringPiece.
+  bool RelocateStringPiece(const StringPiece& src, StringPiece* sp);
+
+  // Reserves a blob of the specified size in the arena, and returns a pointer
+  // to it. The caller can then fill the allocated memory. The pointer is
+  // guaranteed to remain valid during the lifetime of the arena.
+  // If this request would make the arena grow and the allocator denies that,
+  // returns NULL and leaves the arena unchanged.
+  void* AllocateBytes(const size_t size) {
+    return AllocateBytesAligned(size, 1);
+  }
+
+  // Allocate bytes, ensuring a specified alignment.
+  // NOTE: alignment MUST be a power of two, or else this will break.
+  void* AllocateBytesAligned(const size_t size, const size_t alignment);
+
+  // Removes all data from the arena. (Invalidates all pointers returned by
+  // AddSlice and AllocateBytes). Does not cause memory allocation.
+  // May reduce memory footprint, as it discards all allocated buffers but
+  // the last one.
+  // Unless allocations exceed max_buffer_size, repetitive filling up and
+  // resetting normally lead to quickly settling memory footprint and ceasing
+  // buffer allocations, as the arena keeps reusing a single, large buffer.
+  void Reset();
+
+  // Returns the memory footprint of this arena, in bytes, defined as a sum of
+  // all buffer sizes. Always greater or equal to the total number of
+  // bytes allocated out of the arena.
+  size_t memory_footprint() const;
+
+ private:
+  typedef typename ArenaTraits<THREADSAFE>::mutex_type mutex_type;
+  // Encapsulates a single buffer in the arena.
+  class Component;
+
+  // Fallback for AllocateBytes non-fast-path
+  void* AllocateBytesFallback(const size_t size, const size_t align);
+
+  Component* NewComponent(size_t requested_size, size_t minimum_size);
+  void AddComponent(Component *component);
+
+  // Load the current component, with "Acquire" semantics (see atomicops.h)
+  // if the arena is meant to be thread-safe.
+  inline Component* AcquireLoadCurrent() {
+    if (THREADSAFE) {
+      return reinterpret_cast<Component*>(
+        base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&current_)));
+    } else {
+      return current_;
+    }
+  }
+
+  // Store the current component, with "Release" semantics (see atomicops.h)
+  // if the arena is meant to be thread-safe.
+  inline void ReleaseStoreCurrent(Component* c) {
+    if (THREADSAFE) {
+      base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&current_),
+                                  reinterpret_cast<AtomicWord>(c));
+    } else {
+      current_ = c;
+    }
+  }
+
+  BufferAllocator* const buffer_allocator_;
+  vector<std::unique_ptr<Component> > arena_;
+
+  // The current component to allocate from.
+  // Use AcquireLoadCurrent and ReleaseStoreCurrent to load/store.
+  Component* current_;
+  const size_t max_buffer_size_;
+  size_t arena_footprint_;
+
+  // Lock covering 'slow path' allocation, when new components are
+  // allocated and added to the arena's list. Also covers any other
+  // mutation of the component data structure (eg Reset).
+  mutable mutex_type component_lock_;
+
+  DISALLOW_COPY_AND_ASSIGN(ArenaBase);
+};
+
+// STL-compliant allocator, for use with hash_maps and other structures
+// which share lifetime with an Arena. Enables memory control and improves
+// performance.
+template<class T, bool THREADSAFE> class ArenaAllocator {
+ public:
+  typedef T value_type;
+  typedef size_t size_type;
+  typedef ptrdiff_t difference_type;
+
+  typedef T* pointer;
+  typedef const T* const_pointer;
+  typedef T& reference;
+  typedef const T& const_reference;
+  pointer index(reference r) const  { return &r; }
+  const_pointer index(const_reference r) const  { return &r; }
+  size_type max_size() const  { return size_t(-1) / sizeof(T); }
+
+  explicit ArenaAllocator(ArenaBase<THREADSAFE>* arena) : arena_(arena) {
+    CHECK_NOTNULL(arena_);
+  }
+
+  ~ArenaAllocator() { }
+
+  pointer allocate(size_type n, allocator<void>::const_pointer /*hint*/ = 0) {
+    return reinterpret_cast<T*>(arena_->AllocateBytes(n * sizeof(T)));
+  }
+
+  void deallocate(pointer p, size_type n) {}
+
+  void construct(pointer p, const T& val) {
+    new(reinterpret_cast<void*>(p)) T(val);
+  }
+
+  void destroy(pointer p) { p->~T(); }
+
+  template<class U> struct rebind {
+    typedef ArenaAllocator<U, THREADSAFE> other;
+  };
+
+  template<class U, bool TS> ArenaAllocator(const ArenaAllocator<U, TS>& other)
+      : arena_(other.arena()) { }
+
+  template<class U, bool TS> bool operator==(const ArenaAllocator<U, TS>& 
other) const {
+    return arena_ == other.arena();
+  }
+
+  template<class U, bool TS> bool operator!=(const ArenaAllocator<U, TS>& 
other) const {
+    return arena_ != other.arena();
+  }
+
+  ArenaBase<THREADSAFE> *arena() const {
+    return arena_;
+  }
+
+ private:
+
+  ArenaBase<THREADSAFE>* arena_;
+};
+
+
+class Arena : public ArenaBase<false> {
+ public:
+  explicit Arena(size_t initial_buffer_size, size_t max_buffer_size) :
+    ArenaBase<false>(initial_buffer_size, max_buffer_size)
+  {}
+};
+
+class ThreadSafeArena : public ArenaBase<true> {
+ public:
+  explicit ThreadSafeArena(size_t initial_buffer_size, size_t max_buffer_size) 
:
+    ArenaBase<true>(initial_buffer_size, max_buffer_size)
+  {}
+};
+
+// Arena implementation that is integrated with MemTracker in order to
+// track heap-allocated space consumed by the arena.
+
+class MemoryTrackingArena : public ArenaBase<false> {
+ public:
+
+  MemoryTrackingArena(
+      size_t initial_buffer_size,
+      size_t max_buffer_size,
+      const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator)
+      : ArenaBase<false>(tracking_allocator.get(), initial_buffer_size, 
max_buffer_size),
+        tracking_allocator_(tracking_allocator) {}
+
+  ~MemoryTrackingArena() {
+  }
+
+ private:
+
+  // This is required in order for the Arena to survive even after tablet is 
shut down,
+  // e.g., in the case of Scanners running scanners (see tablet_server-test.cc)
+  std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_;
+};
+
+class ThreadSafeMemoryTrackingArena : public ArenaBase<true> {
+ public:
+
+  ThreadSafeMemoryTrackingArena(
+      size_t initial_buffer_size,
+      size_t max_buffer_size,
+      const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator)
+      : ArenaBase<true>(tracking_allocator.get(), initial_buffer_size, 
max_buffer_size),
+        tracking_allocator_(tracking_allocator) {}
+
+  ~ThreadSafeMemoryTrackingArena() {
+  }
+
+ private:
+
+  // See comment in MemoryTrackingArena above.
+  std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_;
+};
+
+// Implementation of inline and template methods
+
+template<bool THREADSAFE>
+class ArenaBase<THREADSAFE>::Component {
+ public:
+  explicit Component(Buffer* buffer)
+      : buffer_(buffer),
+        data_(static_cast<uint8_t*>(buffer->data())),
+        offset_(0),
+        size_(buffer->size()) {}
+
+  // Tries to reserve space in this component. Returns the pointer to the
+  // reserved space if successful; NULL on failure (if there's no more room).
+  uint8_t* AllocateBytes(const size_t size) {
+    return AllocateBytesAligned(size, 1);
+  }
+
+  uint8_t *AllocateBytesAligned(const size_t size, const size_t alignment);
+
+  size_t size() const { return size_; }
+  void Reset() {
+    ASAN_POISON_MEMORY_REGION(data_, size_);
+    offset_ = 0;
+  }
+
+ private:
+  // Mark the given range unpoisoned in ASAN.
+  // This is a no-op in a non-ASAN build.
+  void AsanUnpoison(const void* addr, size_t size);
+
+  gscoped_ptr<Buffer> buffer_;
+  uint8_t* const data_;
+  typename ArenaTraits<THREADSAFE>::offset_type offset_;
+  const size_t size_;
+
+#ifdef ADDRESS_SANITIZER
+  // Lock used around unpoisoning memory when ASAN is enabled.
+  // ASAN does not support concurrent unpoison calls that may overlap a 
particular
+  // memory word (8 bytes).
+  typedef typename ArenaTraits<THREADSAFE>::spinlock_type spinlock_type;
+  spinlock_type asan_lock_;
+#endif
+  DISALLOW_COPY_AND_ASSIGN(Component);
+};
+
+
+// Thread-safe implementation
+template <>
+inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
+  const size_t size, const size_t alignment) {
+  // Special case check the allowed alignments. Currently, we only ensure
+  // the allocated buffer components are 16-byte aligned, and the code path
+  // doesn't support larger alignment.
+  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
+         alignment == 8 || alignment == 16)
+    << "bad alignment: " << alignment;
+  retry:
+  Atomic32 offset = Acquire_Load(&offset_);
+
+  Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment);
+  Atomic32 new_offset = aligned + size;
+
+  if (PREDICT_TRUE(new_offset <= size_)) {
+    bool success = Acquire_CompareAndSwap(&offset_, offset, new_offset) == 
offset;
+    if (PREDICT_TRUE(success)) {
+      AsanUnpoison(data_ + aligned, size);
+      return data_ + aligned;
+    } else {
+      // Raced with another allocator
+      goto retry;
+    }
+  } else {
+    return NULL;
+  }
+}
+
+// Non-Threadsafe implementation
+template <>
+inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned(
+  const size_t size, const size_t alignment) {
+  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
+         alignment == 8 || alignment == 16)
+    << "bad alignment: " << alignment;
+  size_t aligned = KUDU_ALIGN_UP(offset_, alignment);
+  uint8_t* destination = data_ + aligned;
+  size_t save_offset = offset_;
+  offset_ = aligned + size;
+  if (PREDICT_TRUE(offset_ <= size_)) {
+    AsanUnpoison(data_ + aligned, size);
+    return destination;
+  } else {
+    offset_ = save_offset;
+    return NULL;
+  }
+}
+
+template <bool THREADSAFE>
+inline void ArenaBase<THREADSAFE>::Component::AsanUnpoison(const void* addr, 
size_t size) {
+#ifdef ADDRESS_SANITIZER
+  std::lock_guard<spinlock_type> l(asan_lock_);
+  ASAN_UNPOISON_MEMORY_REGION(addr, size);
+#endif
+}
+
+// Fast-path allocation should get inlined, and fall-back
+// to non-inline function call for allocation failure
+template <bool THREADSAFE>
+inline void *ArenaBase<THREADSAFE>::AllocateBytesAligned(const size_t size, 
const size_t align) {
+  void* result = AcquireLoadCurrent()->AllocateBytesAligned(size, align);
+  if (PREDICT_TRUE(result != NULL)) return result;
+  return AllocateBytesFallback(size, align);
+}
+
+template <bool THREADSAFE>
+inline uint8_t* ArenaBase<THREADSAFE>::AddSlice(const Slice& value) {
+  return reinterpret_cast<uint8_t *>(AddBytes(value.data(), value.size()));
+}
+
+template <bool THREADSAFE>
+inline void *ArenaBase<THREADSAFE>::AddBytes(const void *data, size_t len) {
+  void* destination = AllocateBytes(len);
+  if (destination == NULL) return NULL;
+  memcpy(destination, data, len);
+  return destination;
+}
+
+template <bool THREADSAFE>
+inline bool ArenaBase<THREADSAFE>::RelocateSlice(const Slice &src, Slice *dst) 
{
+  void* destination = AllocateBytes(src.size());
+  if (destination == NULL) return false;
+  memcpy(destination, src.data(), src.size());
+  *dst = Slice(reinterpret_cast<uint8_t *>(destination), src.size());
+  return true;
+}
+
+
+template <bool THREADSAFE>
+inline bool ArenaBase<THREADSAFE>::RelocateStringPiece(const StringPiece& src, 
StringPiece* sp) {
+  Slice slice(src.data(), src.size());
+  if (!RelocateSlice(slice, &slice)) return false;
+  *sp = StringPiece(reinterpret_cast<const char*>(slice.data()), slice.size());
+  return true;
+}
+
+template<bool THREADSAFE>
+template<class T, class ... Args>
+inline T *ArenaBase<THREADSAFE>::NewObject(Args&&... args) {
+  void *mem = AllocateBytesAligned(sizeof(T), alignof(T));
+  if (mem == NULL) throw std::bad_alloc();
+  return new (mem) T(std::forward<Args>(args)...);
+}
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_MEMORY_ARENA_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/memory.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/memory.cc 
b/be/src/kudu/util/memory/memory.cc
new file mode 100644
index 0000000..9db0464
--- /dev/null
+++ b/be/src/kudu/util/memory/memory.cc
@@ -0,0 +1,338 @@
+// Copyright 2010 Google Inc.  All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "kudu/util/memory/memory.h"
+
+#include <string.h>
+
+#include <algorithm>
+#include <cstdlib>
+
+#include <gflags/gflags.h>
+
+#include "kudu/util/alignment.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/memory/overwrite.h"
+#include "kudu/util/mem_tracker.h"
+
+using std::copy;
+using std::min;
+
+// TODO(onufry) - test whether the code still tests OK if we set this to true,
+// or remove this code and add a test that Google allocator does not change 
it's
+// contract - 16-aligned in -c opt and %16 == 8 in debug.
+DEFINE_bool(allocator_aligned_mode, false,
+            "Use 16-byte alignment instead of 8-byte, "
+            "unless explicitly specified otherwise - to boost SIMD");
+TAG_FLAG(allocator_aligned_mode, hidden);
+
+namespace kudu {
+
+namespace {
+static char dummy_buffer[0] = {};
+}
+
+Buffer::~Buffer() {
+#if !defined(NDEBUG) && !defined(ADDRESS_SANITIZER)
+  // "unrolling" the string "BAD" makes for a much more efficient
+  // OverwriteWithPattern call in debug mode, so we can keep this
+  // useful bit of code without tests going slower!
+  //
+  // In ASAN mode, we don't bother with this, because when we free the memory, 
ASAN will
+  // prevent us from accessing it anyway.
+  OverwriteWithPattern(reinterpret_cast<char*>(data_), size_,
+                       "BADBADBADBADBADBADBADBADBADBADBAD"
+                       "BADBADBADBADBADBADBADBADBADBADBAD"
+                       "BADBADBADBADBADBADBADBADBADBADBAD");
+#endif
+  if (allocator_ != nullptr) allocator_->FreeInternal(this);
+}
+
+void BufferAllocator::LogAllocation(size_t requested,
+                                    size_t minimal,
+                                    Buffer* buffer) {
+  if (buffer == nullptr) {
+    LOG(WARNING) << "Memory allocation failed. "
+                 << "Number of bytes requested: " << requested
+                 << ", minimal: " << minimal;
+    return;
+  }
+  if (buffer->size() < requested) {
+    LOG(WARNING) << "Memory allocation was shorter than requested. "
+                 << "Number of bytes requested to allocate: " << requested
+                 << ", minimal: " << minimal
+                 << ", and actually allocated: " << buffer->size();
+  }
+}
+
+HeapBufferAllocator::HeapBufferAllocator()
+  : aligned_mode_(FLAGS_allocator_aligned_mode) {
+}
+
+Buffer* HeapBufferAllocator::AllocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    BufferAllocator* const originator) {
+  DCHECK_LE(minimal, requested);
+  void* data;
+  size_t attempted = requested;
+  while (true) {
+    data = (attempted == 0) ? &dummy_buffer[0] : Malloc(attempted);
+    if (data != nullptr) {
+      return CreateBuffer(data, attempted, originator);
+    }
+    if (attempted == minimal) return nullptr;
+    attempted = minimal + (attempted - minimal - 1) / 2;
+  }
+}
+
+bool HeapBufferAllocator::ReallocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    Buffer* const buffer,
+    BufferAllocator* const originator) {
+  DCHECK_LE(minimal, requested);
+  void* data;
+  size_t attempted = requested;
+  while (true) {
+    if (attempted == 0) {
+      if (buffer->size() > 0) free(buffer->data());
+      data = &dummy_buffer[0];
+    } else {
+      if (buffer->size() > 0) {
+        data = Realloc(buffer->data(), buffer->size(), attempted);
+      } else {
+        data = Malloc(attempted);
+      }
+    }
+    if (data != nullptr) {
+      UpdateBuffer(data, attempted, buffer);
+      return true;
+    }
+    if (attempted == minimal) return false;
+    attempted = minimal + (attempted - minimal - 1) / 2;
+  }
+}
+
+void HeapBufferAllocator::FreeInternal(Buffer* buffer) {
+  if (buffer->size() > 0) free(buffer->data());
+}
+
+void* HeapBufferAllocator::Malloc(size_t size) {
+  if (aligned_mode_) {
+    void* data;
+    if (posix_memalign(&data, 16, KUDU_ALIGN_UP(size, 16))) {
+       return nullptr;
+    }
+    return data;
+  } else {
+    return malloc(size);
+  }
+}
+
+void* HeapBufferAllocator::Realloc(void* previousData, size_t previousSize,
+                                   size_t newSize) {
+  if (aligned_mode_) {
+    void* data = Malloc(newSize);
+    if (data) {
+// NOTE(ptab): We should use realloc here to avoid memmory coping,
+// but it doesn't work on memory allocated by posix_memalign(...).
+// realloc reallocates the memory but doesn't preserve the content.
+// TODO(ptab): reiterate after some time to check if it is fixed (tcmalloc ?)
+      memcpy(data, previousData, min(previousSize, newSize));
+      free(previousData);
+      return data;
+    } else {
+      return nullptr;
+    }
+  } else {
+    return realloc(previousData, newSize);
+  }
+}
+
+Buffer* ClearingBufferAllocator::AllocateInternal(size_t requested,
+                                                  size_t minimal,
+                                                  BufferAllocator* originator) 
{
+  Buffer* buffer = DelegateAllocate(delegate_, requested, minimal,
+                                    originator);
+  if (buffer != nullptr) memset(buffer->data(), 0, buffer->size());
+  return buffer;
+}
+
+bool ClearingBufferAllocator::ReallocateInternal(size_t requested,
+                                                 size_t minimal,
+                                                 Buffer* buffer,
+                                                 BufferAllocator* originator) {
+  size_t offset = (buffer != nullptr ? buffer->size() : 0);
+  bool success = DelegateReallocate(delegate_, requested, minimal, buffer,
+                                    originator);
+  if (success && buffer->size() > offset) {
+    memset(static_cast<char*>(buffer->data()) + offset, 0,
+           buffer->size() - offset);
+  }
+  return success;
+}
+
+void ClearingBufferAllocator::FreeInternal(Buffer* buffer) {
+  DelegateFree(delegate_, buffer);
+}
+
+Buffer* MediatingBufferAllocator::AllocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    BufferAllocator* const originator) {
+  // Allow the mediator to trim the request.
+  size_t granted;
+  if (requested > 0) {
+    granted = mediator_->Allocate(requested, minimal);
+    if (granted < minimal) return nullptr;
+  } else {
+    granted = 0;
+  }
+  Buffer* buffer = DelegateAllocate(delegate_, granted, minimal, originator);
+  if (buffer == nullptr) {
+    mediator_->Free(granted);
+  } else if (buffer->size() < granted) {
+    mediator_->Free(granted - buffer->size());
+  }
+  return buffer;
+}
+
+bool MediatingBufferAllocator::ReallocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    Buffer* const buffer,
+    BufferAllocator* const originator) {
+  // Allow the mediator to trim the request. Be conservative; assume that
+  // realloc may degenerate to malloc-memcpy-free.
+  size_t granted;
+  if (requested > 0) {
+    granted = mediator_->Allocate(requested, minimal);
+    if (granted < minimal) return false;
+  } else {
+    granted = 0;
+  }
+  size_t old_size = buffer->size();
+  if (DelegateReallocate(delegate_, granted, minimal, buffer, originator)) {
+    mediator_->Free(granted - buffer->size() + old_size);
+    return true;
+  } else {
+    mediator_->Free(granted);
+    return false;
+  }
+}
+
+void MediatingBufferAllocator::FreeInternal(Buffer* buffer) {
+  mediator_->Free(buffer->size());
+  DelegateFree(delegate_, buffer);
+}
+
+Buffer* MemoryStatisticsCollectingBufferAllocator::AllocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    BufferAllocator* const originator) {
+  Buffer* buffer = DelegateAllocate(delegate_, requested, minimal, originator);
+  if (buffer != nullptr) {
+    memory_stats_collector_->AllocatedMemoryBytes(buffer->size());
+  } else {
+    memory_stats_collector_->RefusedMemoryBytes(minimal);
+  }
+  return buffer;
+}
+
+bool MemoryStatisticsCollectingBufferAllocator::ReallocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    Buffer* const buffer,
+    BufferAllocator* const originator) {
+  const size_t old_size = buffer->size();
+  bool outcome = DelegateReallocate(delegate_, requested, minimal, buffer,
+                                    originator);
+  if (buffer->size() > old_size) {
+    memory_stats_collector_->AllocatedMemoryBytes(buffer->size() - old_size);
+  } else if (buffer->size() < old_size) {
+    memory_stats_collector_->FreedMemoryBytes(old_size - buffer->size());
+  } else if (!outcome && (minimal > buffer->size())) {
+    memory_stats_collector_->RefusedMemoryBytes(minimal - buffer->size());
+  }
+  return outcome;
+}
+
+void MemoryStatisticsCollectingBufferAllocator::FreeInternal(Buffer* buffer) {
+  DelegateFree(delegate_, buffer);
+  memory_stats_collector_->FreedMemoryBytes(buffer->size());
+}
+
+size_t MemoryTrackingBufferAllocator::Available() const {
+  return enforce_limit_ ? mem_tracker_->SpareCapacity() : 
std::numeric_limits<int64_t>::max();
+}
+
+bool MemoryTrackingBufferAllocator::TryConsume(int64_t bytes) {
+  // Calls TryConsume first, even if enforce_limit_ is false: this
+  // will cause mem_tracker_ to try to free up more memory by GCing.
+  if (!mem_tracker_->TryConsume(bytes)) {
+    if (enforce_limit_) {
+      return false;
+    } else {
+      // If enforce_limit_ is false, allocate memory anyway.
+      mem_tracker_->Consume(bytes);
+    }
+  }
+  return true;
+}
+
+Buffer* MemoryTrackingBufferAllocator::AllocateInternal(size_t requested,
+                                                        size_t minimal,
+                                                        BufferAllocator* 
originator) {
+  if (TryConsume(requested)) {
+    Buffer* buffer = DelegateAllocate(delegate_, requested, requested, 
originator);
+    if (buffer == nullptr) {
+      mem_tracker_->Release(requested);
+    } else {
+      return buffer;
+    }
+  }
+
+  if (TryConsume(minimal)) {
+    Buffer* buffer = DelegateAllocate(delegate_, minimal, minimal, originator);
+    if (buffer == nullptr) {
+      mem_tracker_->Release(minimal);
+    }
+    return buffer;
+  }
+
+  return nullptr;
+}
+
+
+bool MemoryTrackingBufferAllocator::ReallocateInternal(size_t requested,
+                                                       size_t minimal,
+                                                       Buffer* buffer,
+                                                       BufferAllocator* 
originator) {
+  LOG(FATAL) << "Not implemented";
+  return false;
+}
+
+void MemoryTrackingBufferAllocator::FreeInternal(Buffer* buffer) {
+  DelegateFree(delegate_, buffer);
+  mem_tracker_->Release(buffer->size());
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/memory.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/memory.h b/be/src/kudu/util/memory/memory.h
new file mode 100644
index 0000000..eabc142
--- /dev/null
+++ b/be/src/kudu/util/memory/memory.h
@@ -0,0 +1,976 @@
+// Copyright 2010 Google Inc.  All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+//
+// Classes for memory management, used by materializations
+// (arenas, segments, and STL collections parametrized via arena allocators)
+// so that memory usage can be controlled at the application level.
+//
+// Materializations can be parametrized by specifying an instance of a
+// BufferAllocator. The allocator implements
+// memory management policy (e.g. setting allocation limits). Allocators may
+// be shared between multiple materializations; e.g. you can designate a
+// single allocator per a single user request, thus setting bounds on memory
+// usage on a per-request basis.
+
+#ifndef KUDU_UTIL_MEMORY_MEMORY_H_
+#define KUDU_UTIL_MEMORY_MEMORY_H_
+
+#include <algorithm>
+#include <glog/logging.h>
+#include <limits>
+#include <memory>
+#include <stddef.h>
+#include <vector>
+
+#include "kudu/util/boost_mutex_utils.h"
+#include "kudu/util/memory/overwrite.h"
+#include "kudu/util/mutex.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/logging-inl.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/singleton.h"
+
+using std::copy;
+using std::max;
+using std::min;
+using std::numeric_limits;
+using std::reverse;
+using std::sort;
+using std::swap;
+using std::vector;
+
+namespace kudu {
+
+class BufferAllocator;
+class MemTracker;
+
+// Wrapper for a block of data allocated by a BufferAllocator. Owns the block.
+// (To release the block, destroy the buffer - it will then return it via the
+// same allocator that has been used to create it).
+class Buffer {
+ public:
+  ~Buffer();
+
+  void* data() const { return data_; }   // The data buffer.
+  size_t size() const { return size_; }  // In bytes.
+
+ private:
+  friend class BufferAllocator;
+
+  Buffer(void* data, size_t size, BufferAllocator* allocator)
+      : data_(CHECK_NOTNULL(data)),
+        size_(size),
+        allocator_(allocator) {
+#ifndef NDEBUG
+    OverwriteWithPattern(reinterpret_cast<char*>(data_), size_,
+                         "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW"
+                         "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW"
+                         "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW");
+#endif
+  }
+
+  // Called by a successful realloc.
+  void Update(void* new_data, size_t new_size) {
+#ifndef NDEBUG
+    if (new_size > size_) {
+      OverwriteWithPattern(reinterpret_cast<char*>(new_data) + size_,
+                           new_size - size_, "NEW");
+    }
+#endif
+    data_ = new_data;
+    size_ = new_size;
+  }
+
+  void* data_;
+  size_t size_;
+  BufferAllocator* const allocator_;
+  DISALLOW_COPY_AND_ASSIGN(Buffer);
+};
+
+// Allocators allow applications to control memory usage. They are
+// used by materializations to allocate blocks of memory arenas.
+// BufferAllocator is an abstract class that defines a common contract of
+// all implementations of allocators. Specific allocators provide specific
+// features, e.g. enforced resource limits, thread safety, etc.
+class BufferAllocator {
+ public:
+  virtual ~BufferAllocator() {}
+
+  // Called by the user when a new block of memory is needed. The 'requested'
+  // parameter specifies how much memory (in bytes) the user would like to get.
+  // The 'minimal' parameter specifies how much he is willing to settle for.
+  // The allocator returns a buffer sized in the range [minimal, requested],
+  // or NULL if the request can't be satisfied. When the buffer is destroyed,
+  // its destructor calls the FreeInternal() method on its allocator.
+  // CAVEAT: The allocator must outlive all buffers returned by it.
+  //
+  // Corner cases:
+  // 1. If requested == 0, the allocator will always return a non-NULL Buffer
+  //    with a non-NULL data pointer and zero capacity.
+  // 2. If minimal == 0, the allocator will always return a non-NULL Buffer
+  //    with a non-NULL data pointer, possibly with zero capacity.
+  Buffer* BestEffortAllocate(size_t requested, size_t minimal) {
+    DCHECK_LE(minimal, requested);
+    Buffer* result = AllocateInternal(requested, minimal, this);
+    LogAllocation(requested, minimal, result);
+    return result;
+  }
+
+  // Called by the user when a new block of memory is needed. Equivalent to
+  // BestEffortAllocate(requested, requested).
+  Buffer* Allocate(size_t requested) {
+    return BestEffortAllocate(requested, requested);
+  }
+
+  // Called by the user when a previously allocated block needs to be resized.
+  // Mimics semantics of <cstdlib> realloc. The 'requested' and 'minimal'
+  // represent the desired final buffer size, with semantics as in the 
Allocate.
+  // If the 'buffer' parameter is NULL, the call is equivalent to
+  // Allocate(requested, minimal). Otherwise, a reallocation of the buffer's
+  // data is attempted. On success, the original 'buffer' parameter is 
returned,
+  // but the buffer itself might have updated size and data. On failure,
+  // returns NULL, and leaves the input buffer unmodified.
+  // Reallocation might happen in-place, preserving the original data
+  // pointer, but it is not guaranteed - e.g. this function might degenerate to
+  // Allocate-Copy-Free. Either way, the content of the data buffer, up to the
+  // minimum of the new and old size, is preserved.
+  //
+  // Corner cases:
+  // 1. If requested == 0, the allocator will always return a non-NULL Buffer
+  //    with a non-NULL data pointer and zero capacity.
+  // 2. If minimal == 0, the allocator will always return a non-NULL Buffer
+  //    with a non-NULL data pointer, possibly with zero capacity.
+  Buffer* BestEffortReallocate(size_t requested,
+                               size_t minimal,
+                               Buffer* buffer) {
+    DCHECK_LE(minimal, requested);
+    Buffer* result;
+    if (buffer == NULL) {
+      result = AllocateInternal(requested, minimal, this);
+      LogAllocation(requested, minimal, result);
+      return result;
+    } else {
+      result =  ReallocateInternal(requested, minimal, buffer, this) ?
+          buffer : NULL;
+      LogAllocation(requested, minimal, buffer);
+      return result;
+    }
+  }
+
+  // Called by the user when a previously allocated block needs to be resized.
+  // Equivalent to BestEffortReallocate(requested, requested, buffer).
+  Buffer* Reallocate(size_t requested, Buffer* buffer) {
+    return BestEffortReallocate(requested, requested, buffer);
+  }
+
+  // Returns the amount of memory (in bytes) still available for this 
allocator.
+  // For unbounded allocators (like raw HeapBufferAllocator) this is the 
highest
+  // size_t value possible.
+  // TODO(user): consider making pure virtual.
+  virtual size_t Available() const { return numeric_limits<size_t>::max(); }
+
+ protected:
+  friend class Buffer;
+
+  BufferAllocator() {}
+
+  // Expose the constructor to subclasses of BufferAllocator.
+  Buffer* CreateBuffer(void* data,
+                       size_t size,
+                       BufferAllocator* allocator) {
+    return new Buffer(data, size, allocator);
+  }
+
+  // Expose Buffer::Update to subclasses of BufferAllocator.
+  void UpdateBuffer(void* new_data, size_t new_size, Buffer* buffer) {
+    buffer->Update(new_data, new_size);
+  }
+
+  // Called by chained buffer allocators.
+  Buffer* DelegateAllocate(BufferAllocator* delegate,
+                           size_t requested,
+                           size_t minimal,
+                           BufferAllocator* originator) {
+    return delegate->AllocateInternal(requested, minimal, originator);
+  }
+
+  // Called by chained buffer allocators.
+  bool DelegateReallocate(BufferAllocator* delegate,
+                          size_t requested,
+                          size_t minimal,
+                          Buffer* buffer,
+                          BufferAllocator* originator) {
+    return delegate->ReallocateInternal(requested, minimal, buffer, 
originator);
+  }
+
+  // Called by chained buffer allocators.
+  void DelegateFree(BufferAllocator* delegate, Buffer* buffer) {
+    delegate->FreeInternal(buffer);
+  }
+
+ private:
+  // Implemented by concrete subclasses.
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) = 0;
+
+  // Implemented by concrete subclasses. Returns false on failure.
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) = 0;
+
+  // Implemented by concrete subclasses.
+  virtual void FreeInternal(Buffer* buffer) = 0;
+
+  // Logs a warning message if the allocation failed or if it returned less 
than
+  // the required number of bytes.
+  void LogAllocation(size_t required, size_t minimal, Buffer* buffer);
+
+  DISALLOW_COPY_AND_ASSIGN(BufferAllocator);
+};
+
+// Allocates buffers on the heap, with no memory limits. Uses standard C
+// allocation functions (malloc, realloc, free).
+class HeapBufferAllocator : public BufferAllocator {
+ public:
+  virtual ~HeapBufferAllocator() {}
+
+  // Returns a singleton instance of the heap allocator.
+  static HeapBufferAllocator* Get() {
+    return Singleton<HeapBufferAllocator>::get();
+  }
+
+  virtual size_t Available() const OVERRIDE {
+    return numeric_limits<size_t>::max();
+  }
+
+ private:
+  // Allocates memory that is aligned to 16 way.
+  // Use if you want to boost SIMD operations on the memory area.
+  const bool aligned_mode_;
+
+  friend class Singleton<HeapBufferAllocator>;
+
+  // Always allocates 'requested'-sized buffer, or returns NULL on OOM.
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  void* Malloc(size_t size);
+  void* Realloc(void* previousData, size_t previousSize, size_t newSize);
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  HeapBufferAllocator();
+  explicit HeapBufferAllocator(bool aligned_mode)
+      : aligned_mode_(aligned_mode) {}
+
+  DISALLOW_COPY_AND_ASSIGN(HeapBufferAllocator);
+};
+
+// Wrapper around the delegate allocator, that clears all newly allocated
+// (and reallocated) memory.
+class ClearingBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate.
+  explicit ClearingBufferAllocator(BufferAllocator* delegate)
+      : delegate_(delegate) {}
+
+  virtual size_t Available() const OVERRIDE {
+    return delegate_->Available();
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  BufferAllocator* delegate_;
+  DISALLOW_COPY_AND_ASSIGN(ClearingBufferAllocator);
+};
+
+// Abstract policy for modifying allocation requests - e.g. enforcing quotas.
+class Mediator {
+ public:
+  Mediator() {}
+  virtual ~Mediator() {}
+
+  // Called by an allocator when a allocation request is processed.
+  // Must return a value in the range [minimal, requested], or zero. Returning
+  // zero (if minimal is non-zero) indicates denial to allocate. Returning
+  // non-zero indicates that the request should be capped at that value.
+  virtual size_t Allocate(size_t requested, size_t minimal) = 0;
+
+  // Called by an allocator when the specified amount (in bytes) is released.
+  virtual void Free(size_t amount) = 0;
+
+  // TODO(user): consider making pure virtual.
+  virtual size_t Available() const { return numeric_limits<size_t>::max(); }
+};
+
+// Optionally thread-safe skeletal implementation of a 'quota' abstraction,
+// providing methods to allocate resources against the quota, and return them.
+template<bool thread_safe>
+class Quota : public Mediator {
+ public:
+  explicit Quota(bool enforced) : usage_(0), enforced_(enforced) {}
+  virtual ~Quota() {}
+
+  // Returns a value in range [minimal, requested] if not exceeding remaining
+  // quota or if the quota is not enforced (soft quota), and adjusts the usage
+  // value accordingly.  Otherwise, returns zero. The semantics of 'remaining
+  // quota' are defined by subclasses (that must supply GetQuotaInternal()
+  // method).
+  virtual size_t Allocate(size_t requested, size_t minimal) OVERRIDE;
+
+  virtual void Free(size_t amount) OVERRIDE;
+
+  // Returns memory still available in the quota. For unenforced Quota objects,
+  // you are still able to perform _minimal_ allocations when the available
+  // quota is 0 (or less than "minimal" param).
+  virtual size_t Available() const OVERRIDE {
+    lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex());
+    const size_t quota = GetQuotaInternal();
+    return (usage_ >= quota) ? 0 : (quota - usage_);
+  }
+
+  // Returns the current quota value.
+  size_t GetQuota() const;
+
+  // Returns the current usage value, defined as a sum of all the values
+  // granted by calls to Allocate, less these released via calls to Free.
+  size_t GetUsage() const;
+
+  bool enforced() const {
+    return enforced_;
+  }
+
+ protected:
+  // Overridden by specific implementations, to define semantics of
+  // the quota, i.e. the total amount of resources that the mediator will
+  // allocate. Called directly from GetQuota that optionally provides
+  // thread safety. An 'Allocate' request will succeed if
+  // GetUsage() + minimal <= GetQuota() or if the quota is not enforced (soft
+  // quota).
+  virtual size_t GetQuotaInternal() const = 0;
+
+  Mutex* mutex() const { return thread_safe ? &mutex_ : NULL; }
+
+ private:
+  mutable Mutex mutex_;
+  size_t usage_;
+  bool enforced_;
+  DISALLOW_COPY_AND_ASSIGN(Quota);
+};
+
+// Optionally thread-safe static quota implementation (where quota is 
explicitly
+// set to a concrete numeric value).
+template<bool thread_safe>
+class StaticQuota : public Quota<thread_safe> {
+ public:
+  explicit StaticQuota(size_t quota)
+      : Quota<thread_safe>(true) {
+    SetQuota(quota);
+  }
+  StaticQuota(size_t quota, bool enforced)
+      : Quota<thread_safe>(enforced) {
+    SetQuota(quota);
+  }
+  virtual ~StaticQuota() {}
+
+  // Sets quota to the new value.
+  void SetQuota(const size_t quota);
+
+ protected:
+  virtual size_t GetQuotaInternal() const { return quota_; }
+
+ private:
+  size_t quota_;
+  DISALLOW_COPY_AND_ASSIGN(StaticQuota);
+};
+
+// Places resource limits on another allocator, using the specified Mediator
+// (e.g. quota) implementation.
+//
+// If the mediator and the delegate allocator are thread-safe, this allocator
+// is also thread-safe, to the extent that it will not introduce any
+// state inconsistencies. However, without additional synchronization,
+// allocation requests are not atomic end-to-end. This way, it is deadlock-
+// resilient (even if you have cyclic relationships between allocators) and
+// allows better concurrency. But, it may cause over-conservative
+// allocations under memory contention, if you have multiple levels of
+// mediating allocators. For example, if two requests that can't both be
+// satisfied are submitted concurrently, it may happen that one of them 
succeeds
+// but gets smaller buffer allocated than it would if the requests were 
strictly
+// ordered. This is usually not a problem, however, as you don't really want to
+// operate so close to memory limits that some of your allocations can't be
+// satisfied. If you do have a simple, cascading graph of allocators though,
+// and want to force requests be atomic end-to-end, put a
+// ThreadSafeBufferAllocator at the entry point.
+class MediatingBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate, nor the mediator, allowing
+  // both to be reused.
+  MediatingBufferAllocator(BufferAllocator* const delegate,
+                           Mediator* const mediator)
+      : delegate_(delegate),
+        mediator_(mediator) {}
+
+  virtual ~MediatingBufferAllocator() {}
+
+  virtual size_t Available() const OVERRIDE {
+    return min(delegate_->Available(), mediator_->Available());
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  BufferAllocator* delegate_;
+  Mediator* const mediator_;
+};
+
+// Convenience non-thread-safe static memory bounds enforcer.
+// Combines MediatingBufferAllocator with a StaticQuota.
+class MemoryLimit : public BufferAllocator {
+ public:
+  // Creates a limiter based on the default, heap allocator. Quota is infinite.
+  // (Can be set using SetQuota).
+  MemoryLimit()
+      : quota_(std::numeric_limits<size_t>::max()),
+        allocator_(HeapBufferAllocator::Get(), &quota_) {}
+
+  // Creates a limiter based on the default, heap allocator.
+  explicit MemoryLimit(size_t quota)
+      : quota_(quota),
+        allocator_(HeapBufferAllocator::Get(), &quota_) {}
+
+  // Creates a limiter relaying to the specified delegate allocator.
+  MemoryLimit(size_t quota, BufferAllocator* const delegate)
+      : quota_(quota),
+        allocator_(delegate, &quota_) {}
+
+  // Creates a (possibly non-enforcing) limiter relaying to the specified
+  // delegate allocator.
+  MemoryLimit(size_t quota, bool enforced, BufferAllocator* const delegate)
+      : quota_(quota, enforced),
+        allocator_(delegate, &quota_) {}
+
+  virtual ~MemoryLimit() {}
+
+  virtual size_t Available() const OVERRIDE {
+    return allocator_.Available();
+  }
+
+  size_t GetQuota() const { return quota_.GetQuota(); }
+  size_t GetUsage() const { return quota_.GetUsage(); }
+  void SetQuota(const size_t quota) { quota_.SetQuota(quota); }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    return DelegateAllocate(&allocator_, requested, minimal, originator);
+  }
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    return DelegateReallocate(&allocator_, requested, minimal, buffer,
+                              originator);
+  }
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    DelegateFree(&allocator_, buffer);
+  }
+
+  StaticQuota<false> quota_;
+  MediatingBufferAllocator allocator_;
+};
+
+// An allocator that allows to bypass the (potential) soft quota below for a
+// given amount of memory usage. The goal is to make the allocation methods and
+// Available() work as if the allocator below had at least bypassed_amount of
+// soft quota. Of course this class doesn't allow to exceed the hard quota.
+class SoftQuotaBypassingBufferAllocator : public BufferAllocator {
+ public:
+  SoftQuotaBypassingBufferAllocator(BufferAllocator* allocator,
+                                    size_t bypassed_amount)
+      : allocator_(std::numeric_limits<size_t>::max(), allocator),
+        bypassed_amount_(bypassed_amount) {}
+
+  virtual size_t Available() const OVERRIDE {
+    const size_t usage = allocator_.GetUsage();
+    size_t available = allocator_.Available();
+    if (bypassed_amount_ > usage) {
+      available = max(bypassed_amount_ - usage, available);
+    }
+    return available;
+  }
+
+ private:
+  // Calculates how much to increase the minimal parameter to allocate more
+  // aggressively in the underlying allocator. This is to avoid getting only
+  // very small allocations when we exceed the soft quota below. The request
+  // with increased minimal size is more likely to fail because of exceeding
+  // hard quota, so we also fall back to the original minimal size.
+  size_t AdjustMinimal(size_t requested, size_t minimal) const {
+    return min(requested, max(minimal, Available()));
+  }
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    // Try increasing the "minimal" parameter to allocate more aggresively
+    // within the bypassed amount of soft quota.
+    Buffer* result = DelegateAllocate(&allocator_,
+                                      requested,
+                                      AdjustMinimal(requested, minimal),
+                                      originator);
+    if (result != NULL) {
+      return result;
+    } else {
+      return DelegateAllocate(&allocator_,
+                              requested,
+                              minimal,
+                              originator);
+    }
+  }
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    if (DelegateReallocate(&allocator_,
+                           requested,
+                           AdjustMinimal(requested, minimal),
+                           buffer,
+                           originator)) {
+      return true;
+    } else {
+      return DelegateReallocate(&allocator_,
+                                requested,
+                                minimal,
+                                buffer,
+                                originator);
+    }
+  }
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    DelegateFree(&allocator_, buffer);
+  }
+
+  // Using MemoryLimit with "infinite" limit to get GetUsage().
+  MemoryLimit allocator_;
+  size_t bypassed_amount_;
+};
+
+// An interface for a MemoryStatisticsCollector - an object which collects
+// information about the memory usage of the allocator. The collector will
+// gather statistics about memory usage based on information received from the
+// allocator.
+class MemoryStatisticsCollectorInterface {
+ public:
+  MemoryStatisticsCollectorInterface() {}
+
+  virtual ~MemoryStatisticsCollectorInterface() {}
+
+  // Informs the collector that the allocator granted bytes memory. Note that 
in
+  // the case of reallocation bytes should be the increase in total memory
+  // usage, not the total size of the buffer after reallocation.
+  virtual void AllocatedMemoryBytes(size_t bytes) = 0;
+
+  // Informs the collector that the allocator received a request for at least
+  // bytes memory, and rejected it (meaning that it granted nothing).
+  virtual void RefusedMemoryBytes(size_t bytes) = 0;
+
+  // Informs the collector that bytes memory have been released to the
+  // allocator.
+  virtual void FreedMemoryBytes(size_t bytes) = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MemoryStatisticsCollectorInterface);
+};
+
+class MemoryStatisticsCollectingBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate.
+  // Takes ownership of memory_stats_collector.
+  MemoryStatisticsCollectingBufferAllocator(
+      BufferAllocator* const delegate,
+      MemoryStatisticsCollectorInterface* const memory_stats_collector)
+      : delegate_(delegate),
+        memory_stats_collector_(memory_stats_collector) {}
+
+  virtual ~MemoryStatisticsCollectingBufferAllocator() {}
+
+  virtual size_t Available() const OVERRIDE {
+    return delegate_->Available();
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  BufferAllocator* delegate_;
+  gscoped_ptr<MemoryStatisticsCollectorInterface>
+      memory_stats_collector_;
+};
+
+// BufferAllocator which uses MemTracker to keep track of and optionally
+// (if a limit is set on the MemTracker) regulate memory consumption.
+class MemoryTrackingBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate. The delegate must remain
+  // valid for the lifetime of this allocator. Increments reference
+  // count for 'mem_tracker'.
+  // If 'mem_tracker' has a limit and 'enforce_limit' is true, then
+  // the classes calling this buffer allocator (whether directly, or
+  // through an Arena) must be able to handle the case when allocation
+  // fails. If 'enforce_limit' is false (this is the default), then
+  // allocation will always succeed.
+  MemoryTrackingBufferAllocator(BufferAllocator* const delegate,
+                                std::shared_ptr<MemTracker> mem_tracker,
+                                bool enforce_limit = false)
+      : delegate_(delegate),
+        mem_tracker_(std::move(mem_tracker)),
+        enforce_limit_(enforce_limit) {}
+
+  virtual ~MemoryTrackingBufferAllocator() {}
+
+  // If enforce limit is false, this always returns maximum possible value
+  // for int64_t (std::numeric_limits<int64_t>::max()). Otherwise, this
+  // is equivalent to calling mem_tracker_->SpareCapacity();
+  virtual size_t Available() const OVERRIDE;
+
+ private:
+
+  // If enforce_limit_ is true, this is equivalent to calling
+  // mem_tracker_->TryConsume(bytes). If enforce_limit_ is false and
+  // mem_tracker_->TryConsume(bytes) is false, we call
+  // mem_tracker_->Consume(bytes) and always return true.
+  bool TryConsume(int64_t bytes);
+
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  BufferAllocator* delegate_;
+  std::shared_ptr<MemTracker> mem_tracker_;
+  bool enforce_limit_;
+};
+
+// Synchronizes access to AllocateInternal and FreeInternal, and exposes the
+// mutex for use by subclasses. Allocation requests performed through this
+// allocator are atomic end-to-end. Template parameter DelegateAllocatorType
+// allows to specify a subclass of BufferAllocator for the delegate, to allow
+// subclasses of ThreadSafeBufferAllocator to access additional methods 
provided
+// by the allocator subclass. If this is not needed, it can be set to
+// BufferAllocator.
+template <class DelegateAllocatorType>
+class ThreadSafeBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate.
+  explicit ThreadSafeBufferAllocator(DelegateAllocatorType* delegate)
+      : delegate_(delegate) {}
+  virtual ~ThreadSafeBufferAllocator() {}
+
+  virtual size_t Available() const OVERRIDE {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return delegate()->Available();
+  }
+
+ protected:
+  Mutex* mutex() const { return &mutex_; }
+  // Expose the delegate allocator, with the precise type of the allocator
+  // specified by the template parameter. The delegate() methods themselves
+  // don't give any thread-safety guarantees. Protect all uses taking the Mutex
+  // exposed by the mutex() method.
+  DelegateAllocatorType* delegate() { return delegate_; }
+  const DelegateAllocatorType* delegate() const { return delegate_; }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return DelegateAllocate(delegate(), requested, minimal, originator);
+  }
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return DelegateReallocate(delegate(), requested, minimal, buffer,
+                              originator);
+  }
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    lock_guard_maybe<Mutex> lock(mutex());
+    DelegateFree(delegate(), buffer);
+  }
+
+  DelegateAllocatorType* delegate_;
+  mutable Mutex mutex_;
+  DISALLOW_COPY_AND_ASSIGN(ThreadSafeBufferAllocator);
+};
+
+// A version of ThreadSafeBufferAllocator that owns the supplied delegate
+// allocator.
+template <class DelegateAllocatorType>
+class OwningThreadSafeBufferAllocator
+    : public ThreadSafeBufferAllocator<DelegateAllocatorType> {
+ public:
+  explicit OwningThreadSafeBufferAllocator(DelegateAllocatorType* delegate)
+      : ThreadSafeBufferAllocator<DelegateAllocatorType>(delegate),
+        delegate_owned_(delegate) {}
+  virtual ~OwningThreadSafeBufferAllocator() {}
+
+ private:
+  gscoped_ptr<DelegateAllocatorType> delegate_owned_;
+};
+
+class ThreadSafeMemoryLimit
+    : public OwningThreadSafeBufferAllocator<MemoryLimit> {
+ public:
+  ThreadSafeMemoryLimit(size_t quota, bool enforced,
+                        BufferAllocator* const delegate)
+      : OwningThreadSafeBufferAllocator<MemoryLimit>(
+            new MemoryLimit(quota, enforced, delegate)) {}
+  virtual ~ThreadSafeMemoryLimit() {}
+
+  size_t GetQuota() const {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return delegate()->GetQuota();
+  }
+  size_t GetUsage() const {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return delegate()->GetUsage();
+  }
+  void SetQuota(const size_t quota) {
+    lock_guard_maybe<Mutex> lock(mutex());
+    delegate()->SetQuota(quota);
+  }
+};
+
+// A BufferAllocator that can be given ownership of many objects of given type.
+// These objects will then be deleted when the buffer allocator is destroyed.
+// The objects added last are deleted first (LIFO).
+template <typename OwnedType>
+class OwningBufferAllocator : public BufferAllocator {
+ public:
+  // Doesn't take ownership of delegate.
+  explicit OwningBufferAllocator(BufferAllocator* const delegate)
+      : delegate_(delegate) {}
+
+  virtual ~OwningBufferAllocator() {
+    // Delete elements starting from the end.
+    while (!owned_.empty()) {
+      OwnedType* p = owned_.back();
+      owned_.pop_back();
+      delete p;
+    }
+  }
+
+  // Add to the collection of objects owned by this allocator. The object added
+  // last is deleted first.
+  OwningBufferAllocator* Add(OwnedType* p) {
+    owned_.push_back(p);
+    return this;
+  }
+
+  virtual size_t Available() const OVERRIDE {
+    return delegate_->Available();
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    return DelegateAllocate(delegate_, requested, minimal, originator);
+  }
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    return DelegateReallocate(delegate_, requested, minimal, buffer,
+                              originator);
+  }
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    DelegateFree(delegate_, buffer);
+  }
+
+  // Not using PointerVector here because we want to guarantee certain order of
+  // deleting elements (starting from the ones added last).
+  vector<OwnedType*> owned_;
+  BufferAllocator* delegate_;
+};
+
+// Buffer allocator that tries to guarantee the exact and consistent amount
+// of memory. Uses hard MemoryLimit to enforce the upper bound but also
+// guarantees consistent allocations by ignoring minimal requested amounts and
+// always returning the full amount of memory requested if available.
+// Allocations will fail if the memory requested would exceed the quota or if
+// the underlying allocator fails to provide the memory.
+class GuaranteeMemory : public BufferAllocator {
+ public:
+  // Doesn't take ownership of 'delegate'.
+  GuaranteeMemory(size_t memory_quota,
+                  BufferAllocator* delegate)
+      : limit_(memory_quota, true, delegate),
+        memory_guarantee_(memory_quota) {}
+
+  virtual size_t Available() const OVERRIDE {
+    return memory_guarantee_ - limit_.GetUsage();
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    if (requested > Available()) {
+      return NULL;
+    } else {
+      return DelegateAllocate(&limit_, requested, requested, originator);
+    }
+  }
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    int64 additional_memory = requested - (buffer != NULL ? buffer->size() : 
0);
+    return additional_memory <= static_cast<int64>(Available())
+        && DelegateReallocate(&limit_, requested, requested,
+                              buffer, originator);
+  }
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    DelegateFree(&limit_, buffer);
+  }
+
+  MemoryLimit limit_;
+  size_t memory_guarantee_;
+  DISALLOW_COPY_AND_ASSIGN(GuaranteeMemory);
+};
+
+// Implementation of inline and template methods
+
+template<bool thread_safe>
+size_t Quota<thread_safe>::Allocate(const size_t requested,
+                                    const size_t minimal) {
+  lock_guard_maybe<Mutex> lock(mutex());
+  DCHECK_LE(minimal, requested)
+      << "\"minimal\" shouldn't be bigger than \"requested\"";
+  const size_t quota = GetQuotaInternal();
+  size_t allocation;
+  if (usage_ > quota || minimal > quota - usage_) {
+    // OOQ (Out of quota).
+    if (!enforced() && minimal <= numeric_limits<size_t>::max() - usage_) {
+      // The quota is unenforced and the value of "minimal" won't cause an
+      // overflow. Perform a minimal allocation.
+      allocation = minimal;
+    } else {
+      allocation = 0;
+    }
+    LOG(WARNING) << "Out of quota. Requested: " << requested
+                 << " bytes, or at least minimal: " << minimal
+                 << ". Current quota value is: " << quota
+                 << " while current usage is: " << usage_
+                 << ". The quota is " << (enforced() ? "" : "not ")
+                 << "enforced. "
+                 << ((allocation == 0) ? "Did not allocate any memory."
+                 : "Allocated the minimal value requested.");
+  } else {
+    allocation = min(requested, quota - usage_);
+  }
+  usage_ += allocation;
+  return allocation;
+}
+
+template<bool thread_safe>
+void Quota<thread_safe>::Free(size_t amount) {
+  lock_guard_maybe<Mutex> lock(mutex());
+  usage_ -= amount;
+  // threads allocate/free memory concurrently via the same Quota object that 
is
+  // not protected with a mutex (thread_safe == false).
+  if (usage_ > (numeric_limits<size_t>::max() - (1 << 28))) {
+    LOG(ERROR) << "Suspiciously big usage_ value: " << usage_
+               << " (could be a result size_t wrapping around below 0, "
+               << "for example as a result of race condition).";
+  }
+}
+
+template<bool thread_safe>
+size_t Quota<thread_safe>::GetQuota() const {
+  lock_guard_maybe<Mutex> lock(mutex());
+  return GetQuotaInternal();
+}
+
+template<bool thread_safe>
+size_t Quota<thread_safe>::GetUsage() const {
+  lock_guard_maybe<Mutex> lock(mutex());
+  return usage_;
+}
+
+template<bool thread_safe>
+void StaticQuota<thread_safe>::SetQuota(const size_t quota) {
+  lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex());
+  quota_ = quota;
+}
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_MEMORY_MEMORY_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/overwrite.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/overwrite.cc 
b/be/src/kudu/util/memory/overwrite.cc
new file mode 100644
index 0000000..c70cf4a
--- /dev/null
+++ b/be/src/kudu/util/memory/overwrite.cc
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/memory/overwrite.h"
+
+#include "kudu/gutil/strings/stringpiece.h"
+
+#include <string.h>
+#include <glog/logging.h>
+namespace kudu {
+
+void OverwriteWithPattern(char* p, size_t len, StringPiece pattern) {
+  size_t pat_len = pattern.size();
+  CHECK_LT(0, pat_len);
+  size_t rem = len;
+  const char *pat_ptr = pattern.data();
+
+  while (rem >= pat_len) {
+    memcpy(p, pat_ptr, pat_len);
+    p += pat_len;
+    rem -= pat_len;
+  }
+
+  while (rem-- > 0) {
+    *p++ = *pat_ptr++;
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memory/overwrite.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/overwrite.h 
b/be/src/kudu/util/memory/overwrite.h
new file mode 100644
index 0000000..04286ed
--- /dev/null
+++ b/be/src/kudu/util/memory/overwrite.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_MEMORY_OVERWRITE_H
+#define KUDU_MEMORY_OVERWRITE_H
+
+#include "kudu/gutil/strings/stringpiece.h"
+
+namespace kudu {
+
+// Overwrite 'p' with enough repetitions of 'pattern' to fill 'len'
+// bytes. This is optimized at -O3 even in debug builds, so is
+// reasonably efficient to use.
+void OverwriteWithPattern(char* p, size_t len, StringPiece pattern);
+
+} // namespace kudu
+#endif /* KUDU_MEMORY_OVERWRITE_H */
+

Reply via email to