http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
new file mode 100644
index 0000000..7839612
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -0,0 +1,426 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFER_POOL_H
+#define IMPALA_RUNTIME_BUFFER_POOL_H
+
+#include <stdint.h>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/locks.hpp>
+#include <string>
+
+#include "runtime/bufferpool/buffer-allocator.h"
+#include "runtime/bufferpool/buffer-pool-counters.h"
+#include "common/atomic.h"
+#include "common/status.h"
+#include "gutil/macros.h"
+#include "util/internal-queue.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class BufferAllocator;
+class ReservationTracker;
+
+/// A buffer pool that manages memory buffers for all queries in an Impala 
daemon.
+/// The buffer pool enforces buffer reservations, limits, and implements 
policies
+/// for moving spilled memory from in-memory buffers to disk. It also enables 
reuse of
+/// buffers between queries, to avoid frequent allocations.
+///
+/// The buffer pool can be used for allocating any large buffers (above a 
configurable
+/// minimum length), whether or not the buffers will be spilled. Smaller 
allocations
+/// are not serviced directly by the buffer pool: clients of the buffer pool 
must
+/// subdivide buffers if they wish to use smaller allocations.
+///
+/// All buffer pool operations are in the context of a registered buffer pool 
client.
+/// A buffer pool client should be created for every allocator of buffers at 
the level
+/// of granularity required for reporting and enforcement of reservations, 
e.g. an exec
+/// node. The client tracks buffer reservations via its ReservationTracker and 
also
+/// includes info that is helpful for debugging (e.g. the operator that is 
associated
+/// with the buffer). The client is not threadsafe, i.e. concurrent buffer pool
+/// operations should not be invoked for the same client.
+///
+/// TODO:
+/// * Implement spill-to-disk.
+/// * Decide on, document, and enforce upper limits on page size.
+///
+/// Pages, Buffers and Pinning
+/// ==========================
+/// * A page is a logical block of memory that can reside in memory or on disk.
+/// * A buffer is a physical block of memory that can hold a page in memory.
+/// * A page handle is used by buffer pool clients to identify and access a 
page and
+///   the corresponding buffer. Clients do not interact with pages directly.
+/// * A buffer handle is used by buffer pool clients to identify and access a 
buffer.
+/// * A page is pinned if it has pin count > 0. A pinned page stays mapped to 
the same
+///   buffer.
+/// * An unpinned page can be written out to disk by the buffer pool so that 
the buffer
+///   can be used for another purpose.
+///
+/// Buffer/Page Sizes
+/// =================
+/// The buffer pool has a minimum buffer size, which must be a power-of-two. 
Page and
+/// buffer sizes must be an exact power-of-two multiple of the minimum buffer 
size.
+///
+/// Reservations
+/// ============
+/// Before allocating buffers or pinning pages, a client must reserve memory 
through its
+/// ReservationTracker. Reservation of n bytes give a client the right to 
allocate
+/// buffers or pin pages summing up to n bytes. Reservations are both 
necessary and
+/// sufficient for a client to allocate buffers or pin pages: the operations 
succeed
+/// unless a "system error" such as a disk write error is encountered that 
prevents
+/// unpinned pages from being  to disk.
+///
+/// More memory may be reserved than is used, e.g. if a client is not using 
its full
+/// reservation. In such cases, the buffer pool can use the free buffers in 
any way,
+/// e.g. for keeping unpinned pages in memory, so long as it is able to 
fulfill the
+/// reservations when needed, e.g. by flushing unpinned pages to disk.
+///
+/// Page/Buffer Handles
+/// ===================
+/// The buffer pool exposes PageHandles and BufferHandles, which are owned by 
clients of
+/// the buffer pool, and act as a proxy for the internal data structure 
representing the
+/// page or buffer in the buffer pool. Handles are "open" if they are 
associated with a
+/// page or buffer. An open PageHandle is obtained by creating a page. 
PageHandles are
+/// closed by calling BufferPool::DestroyPage(). An open BufferHandle is 
obtained by
+/// allocating a buffer or extracting a BufferHandle from a PageHandle. A 
page's buffer
+/// can also be accessed through the PageHandle. The handle destructors check 
for
+/// resource leaks, e.g. an open handle that would result in a buffer leak.
+///
+/// Pin Counting of Page Handles:
+/// ----------------------------------
+/// Page handles are scoped to a client. The invariants are as follows:
+/// * A page can only be accessed through an open handle.
+/// * A page is destroyed once the handle is destroyed via DestroyPage().
+/// * A page's buffer can only be accessed through a pinned handle.
+/// * Pin() can be called on an open handle, incrementing the handle's pin 
count.
+/// * Unpin() can be called on a pinned handle, but not an unpinned handle.
+/// * Pin() always increases usage of reservations, and Unpin() always 
decreases usage,
+///   i.e. the handle consumes <pin count> * <page size> bytes of reservation.
+///
+/// Example Usage: Buffers
+/// ==================================
+/// The simplest use case is to allocate a memory buffer.
+/// * The new buffer is created with AllocateBuffer().
+/// * The client reads and writes to the buffer as it sees fit.
+/// * If the client is done with the buffer's contents it can call 
FreeBuffer() to
+///   destroy the handle and free the buffer, or use TransferBuffer() to 
transfer
+///   the buffer to a different client.
+///
+/// Example Usage: Spillable Pages
+/// ==============================
+/// * A spilling operator creates a new page with CreatePage().
+/// * The client reads and writes to the page's buffer as it sees fit.
+/// * If the operator encounters memory pressure, it can decrease reservation 
usage by
+///   calling Unpin() on the page. The page may then be written to disk and 
its buffer
+///   repurposed internally by BufferPool.
+/// * Once the operator needs the page's contents again and has sufficient 
unused
+///   reservation, it can call Pin(), which brings the page's contents back 
into memory,
+///   perhaps in a different buffer. Therefore the operator must fix up any 
pointers into
+///   the previous buffer.
+/// * If the operator is done with the page, it can call FreeBuffer() to 
destroy the
+///   handle and release resources, or call ExtractBuffer() to extract the 
buffer.
+///
+/// Synchronization
+/// ===============
+/// The data structures in the buffer pool itself are thread-safe. 
Client-owned data
+/// structures - Client, PageHandle and BufferHandle - are not protected from 
concurrent
+/// access by the buffer pool: clients must ensure that they do not invoke 
concurrent
+/// operations with the same Client, PageHandle or BufferHandle.
+//
+/// +========================+
+/// | IMPLEMENTATION DETAILS |
+/// +========================+
+///
+/// Lock Ordering
+/// =============
+/// The lock ordering is:
+/// * pages_::lock_ -> Page::lock_
+///
+/// If a reference to a page is acquired via the pages_ list, pages_::lock_ 
must be held
+/// until done with the page to ensure the page isn't concurrently deleted.
+class BufferPool {
+ public:
+  class BufferHandle;
+  class Client;
+  class PageHandle;
+
+  /// Constructs a new buffer pool.
+  /// 'min_buffer_len': the minimum buffer length for the pool. Must be a 
power of two.
+  /// 'buffer_bytes_limit': the maximum physical memory in bytes that can be 
used by the
+  ///     buffer pool. If 'buffer_bytes_limit' is not a multiple of 
'min_buffer_len', the
+  ///     remainder will not be usable.
+  BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit);
+  ~BufferPool();
+
+  /// Register a client. Returns an error status and does not register the 
client if the
+  /// arguments are invalid. 'name' is an arbitrary name used to identify the 
client in
+  /// any errors messages or logging. Counters for this client are added to 
the (non-NULL)
+  /// 'profile'. 'client' is the client to register. 'client' should not 
already be
+  /// registered.
+  Status RegisterClient(const std::string& name, ReservationTracker* 
reservation,
+      RuntimeProfile* profile, Client* client);
+
+  /// Deregister 'client' if it is registered. Idempotent.
+  void DeregisterClient(Client* client);
+
+  /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page 
length
+  /// supported by BufferPool (see BufferPool class comment). The client must 
have
+  /// sufficient unused reservation to pin the new page (otherwise it will 
DCHECK).
+  /// CreatePage() only fails when a system error prevents the buffer pool 
from fulfilling
+  /// the reservation.
+  /// On success, the handle is mapped to the new page.
+  Status CreatePage(Client* client, int64_t len, PageHandle* handle);
+
+  /// Increment the pin count of 'handle'. After Pin() the underlying page will
+  /// be mapped to a buffer, which will be accessible through 'handle'. Uses
+  /// reservation from 'client'. The caller is responsible for ensuring it has 
enough
+  /// unused reservation before calling Pin() (otherwise it will DCHECK). 
Pin() only
+  /// fails when a system error prevents the buffer pool from fulfilling the 
reservation.
+  /// 'handle' must be open.
+  Status Pin(Client* client, PageHandle* handle);
+
+  /// Decrement the pin count of 'handle'. Decrease client's reservation 
usage. If the
+  /// handle's pin count becomes zero, it is no longer valid for the 
underlying page's
+  /// buffer to be accessed via 'handle'. If the page's total pin count across 
all
+  /// handles that reference it goes to zero, the page's data may be written 
to disk and
+  /// the buffer reclaimed. 'handle' must be open and have a pin count > 0.
+  /// TODO: once we implement spilling, it will be an error to call Unpin() 
with
+  /// spilling disabled. E.g. if Impala is running without scratch (we want to 
be
+  /// able to test Unpin() before we implement actual spilling).
+  void Unpin(Client* client, PageHandle* handle);
+
+  /// Destroy the page referenced by 'handle' (if 'handle' is open). Any 
buffers or disk
+  /// storage backing the page are freed. Idempotent. If the page is pinned, 
the
+  /// reservation usage is decreased accordingly.
+  void DestroyPage(Client* client, PageHandle* handle);
+
+  /// Extracts buffer from a pinned page. After this returns, the page 
referenced by
+  /// 'page_handle' will be destroyed and 'buffer_handle' will reference the 
buffer from
+  /// 'page_handle'. This may decrease reservation usage of 'client' if the 
page was
+  /// pinned multiple times via 'page_handle'.
+  void ExtractBuffer(
+      Client* client, PageHandle* page_handle, BufferHandle* buffer_handle);
+
+  /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. 
The caller
+  /// is responsible for ensuring it has enough unused reservation before 
calling
+  /// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails 
when
+  /// a system error prevents the buffer pool from fulfilling the reservation.
+  Status AllocateBuffer(Client* client, int64_t len, BufferHandle* handle);
+
+  /// If 'handle' is open, close 'handle', free the buffer and and decrease the
+  /// reservation usage from 'client'. Idempotent.
+  void FreeBuffer(Client* client, BufferHandle* handle);
+
+  /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move 
the
+  /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' 
and
+  /// decreases reservation usage in 'src_client'. 'src' must be open and 
'dst' must be
+  /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be 
different.
+  /// After a successful call, 'src' is closed and 'dst' is open.
+  Status TransferBuffer(Client* src_client, BufferHandle* src, Client* 
dst_client,
+      BufferHandle* dst);
+
+  /// Print a debug string with the state of the buffer pool.
+  std::string DebugString();
+
+  int64_t min_buffer_len() const;
+  int64_t buffer_bytes_limit() const;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BufferPool);
+  struct Page;
+
+  /// Same as Unpin(), except the lock for the page referenced by 'handle' 
must be held
+  /// by the caller.
+  void UnpinLocked(Client* client, PageHandle* handle);
+
+  /// Perform the cleanup of the page object and handle when the page is 
destroyed.
+  /// Reset 'handle', free the Page object and remove the 'pages_' entry.
+  /// The 'handle->page_' lock should *not* be held by the caller.
+  void CleanUpPage(PageHandle* handle);
+
+  /// Allocate a buffer of length 'len'. Assumes that the client's reservation 
has already
+  /// been consumed for the buffer. Returns an error if the pool is unable to 
fulfill the
+  /// reservation.
+  Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* 
buffer);
+
+  /// Frees 'buffer', which must be open before calling. Closes 'buffer' and 
updates
+  /// internal state but does not release to any reservation.
+  void FreeBufferInternal(BufferHandle* buffer);
+
+  /// Check if we can allocate another buffer of size 'len' bytes without
+  /// 'buffer_bytes_remaining_' going negative.
+  /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if 
successful.
+  bool TryDecreaseBufferBytesRemaining(int64_t len);
+
+  /// Allocator for allocating and freeing all buffer memory.
+  boost::scoped_ptr<BufferAllocator> allocator_;
+
+  /// The minimum length of a buffer in bytes. All buffers and pages are a 
power-of-two
+  /// multiple of this length. This is always a power of two.
+  const int64_t min_buffer_len_;
+
+  /// The maximum physical memory in bytes that can be used for buffers.
+  const int64_t buffer_bytes_limit_;
+
+  /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used 
for
+  /// allocating new buffers. Must be updated atomically before a new buffer is
+  /// allocated or after an existing buffer is freed.
+  AtomicInt64 buffer_bytes_remaining_;
+
+  /// List containing all pages. Protected by the list's internal lock.
+  typedef InternalQueue<Page> PageList;
+  PageList pages_;
+};
+
+/// External representation of a client of the BufferPool. Clients are used for
+/// reservation accounting, and will be used in the future for tracking 
per-client
+/// buffer pool counters. This class is the external handle for a client so
+/// each Client instance is owned by the BufferPool's client, rather than the 
BufferPool.
+/// Each Client should only be used by a single thread at a time: concurrently 
calling
+/// Client methods or BufferPool methods with the Client as an argument is not 
supported.
+class BufferPool::Client {
+ public:
+  Client() : reservation_(NULL) {}
+  /// Client must be deregistered.
+  ~Client() { DCHECK(!is_registered()); }
+
+  bool is_registered() const { return reservation_ != NULL; }
+  ReservationTracker* reservation() { return reservation_; }
+
+  std::string DebugString() const;
+
+ private:
+  friend class BufferPool;
+  DISALLOW_COPY_AND_ASSIGN(Client);
+
+  /// Initialize 'counters_' and add the counters to 'profile'.
+  void InitCounters(RuntimeProfile* profile);
+
+  /// A name identifying the client.
+  std::string name_;
+
+  /// The reservation tracker for the client. NULL means the client isn't 
registered.
+  /// All pages pinned by the client count as usage against 'reservation_'.
+  ReservationTracker* reservation_;
+
+  /// The RuntimeProfile counters for this client. All non-NULL if 
is_registered()
+  /// is true.
+  BufferPoolClientCounters counters_;
+};
+
+/// A handle to a buffer allocated from the buffer pool. Each BufferHandle 
should only
+/// be used by a single thread at a time: concurrently calling BufferHandle 
methods or
+/// BufferPool methods with the BufferHandle as an argument is not supported.
+class BufferPool::BufferHandle {
+ public:
+  BufferHandle();
+  ~BufferHandle() { DCHECK(!is_open()); }
+
+  /// Allow move construction of handles, to support std::move().
+  BufferHandle(BufferHandle&& src);
+
+  /// Allow move assignment of handles, to support STL classes like 
std::vector.
+  /// Destination must be uninitialized.
+  BufferHandle& operator=(BufferHandle&& src);
+
+  bool is_open() const { return data_ != NULL; }
+  int64_t len() const {
+    DCHECK(is_open());
+    return len_;
+  }
+  /// Get a pointer to the start of the buffer.
+  uint8_t* data() const {
+    DCHECK(is_open());
+    return data_;
+  }
+
+  std::string DebugString() const;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BufferHandle);
+  friend class BufferPool;
+
+  /// Internal helper to set the handle to an opened state.
+  void Open(const Client* client, uint8_t* data, int64_t len);
+
+  /// Internal helper to reset the handle to an unopened state.
+  void Reset();
+
+  /// The client the buffer handle belongs to, used to validate that the 
correct client
+  /// is provided in BufferPool method calls.
+  const Client* client_;
+
+  /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
+  uint8_t* data_;
+
+  /// Length of the buffer in bytes.
+  int64_t len_;
+};
+
+/// The handle for a page used by clients of the BufferPool. Each PageHandle 
should
+/// only be used by a single thread at a time: concurrently calling PageHandle 
methods
+/// or BufferPool methods with the PageHandle as an argument is not supported.
+class BufferPool::PageHandle {
+ public:
+  PageHandle();
+  ~PageHandle() { DCHECK(!is_open()); }
+
+  // Allow move construction of page handles, to support std::move().
+  PageHandle(PageHandle&& src);
+
+  // Allow move assignment of page handles, to support STL classes like 
std::vector.
+  // Destination must be closed.
+  PageHandle& operator=(PageHandle&& src);
+
+  bool is_open() const { return page_ != NULL; }
+  bool is_pinned() const { return pin_count() > 0; }
+  int pin_count() const;
+  int64_t len() const;
+  /// Get a pointer to the start of the page's buffer. Only valid to call if 
the page
+  /// is pinned via this handle.
+  uint8_t* data() const { return buffer_handle()->data(); }
+
+  /// Return a pointer to the page's buffer handle. Only valid to call if the 
page is
+  /// pinned via this handle. Only const accessors of the returned handle can 
be used:
+  /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to 
otherwise modify
+  /// the handle.
+  const BufferHandle* buffer_handle() const;
+
+  std::string DebugString() const;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(PageHandle);
+  friend class BufferPool;
+  friend class Page;
+
+  /// Internal helper to open the handle for the given page.
+  void Open(Page* page, Client* client);
+
+  /// Internal helper to reset the handle to an unopened state.
+  void Reset();
+
+  /// The internal page structure. NULL if the handle is not open.
+  Page* page_;
+
+  /// The client the page handle belongs to, used to validate that the correct 
client
+  /// is being used.
+  const Client* client_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/reservation-tracker-counters.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-counters.h 
b/be/src/runtime/bufferpool/reservation-tracker-counters.h
new file mode 100644
index 0000000..8124e10
--- /dev/null
+++ b/be/src/runtime/bufferpool/reservation-tracker-counters.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_RESERVATION_TRACKER_COUNTERS_H
+#define IMPALA_RUNTIME_RESERVATION_TRACKER_COUNTERS_H
+
+#include "util/runtime-profile.h"
+
+namespace impala {
+
+/// A set of counters for each ReservationTracker for reporting purposes.
+///
+/// If the ReservationTracker is linked to a profile these have the same 
lifetime as that
+/// profile, otherwise they have the same lifetime as the ReservationTracker 
itself.
+struct ReservationTrackerCounters {
+  /// The tracker's peak reservation in bytes.
+  RuntimeProfile::HighWaterMarkCounter* peak_reservation;
+
+  /// The tracker's peak usage in bytes.
+  RuntimeProfile::HighWaterMarkCounter* peak_used_reservation;
+
+  /// The hard limit on the tracker's reservations
+  RuntimeProfile::Counter* reservation_limit;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc 
b/be/src/runtime/bufferpool/reservation-tracker-test.cc
new file mode 100644
index 0000000..416a53e
--- /dev/null
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -0,0 +1,378 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "runtime/mem-tracker.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+class ReservationTrackerTest : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  virtual void TearDown() {
+    root_.Close();
+    obj_pool_.Clear();
+  }
+
+  /// The minimum allocation size used in most tests.
+  const static int64_t MIN_BUFFER_LEN = 1024;
+
+ protected:
+  RuntimeProfile* NewProfile() {
+    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+  }
+
+  ObjectPool obj_pool_;
+
+  ReservationTracker root_;
+
+  scoped_ptr<RuntimeProfile> profile_;
+};
+
+const int64_t ReservationTrackerTest::MIN_BUFFER_LEN;
+
+TEST_F(ReservationTrackerTest, BasicSingleTracker) {
+  const int64_t limit = 16;
+  root_.InitRootTracker(NULL, limit);
+  ASSERT_EQ(0, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+
+  // Fail to increase reservation.
+  ASSERT_FALSE(root_.IncreaseReservation(limit + 1));
+  ASSERT_EQ(0, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(0, root_.GetUnusedReservation());
+
+  // Successfully increase reservation.
+  ASSERT_TRUE(root_.IncreaseReservation(limit - 1));
+  ASSERT_EQ(limit - 1, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(limit - 1, root_.GetUnusedReservation());
+
+  // Adjust usage.
+  root_.AllocateFrom(2);
+  ASSERT_EQ(limit - 1, root_.GetReservation());
+  ASSERT_EQ(2, root_.GetUsedReservation());
+  ASSERT_EQ(limit - 3, root_.GetUnusedReservation());
+  root_.ReleaseTo(1);
+  ASSERT_EQ(1, root_.GetUsedReservation());
+  root_.ReleaseTo(1);
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(limit - 1, root_.GetReservation());
+  ASSERT_EQ(limit - 1, root_.GetUnusedReservation());
+}
+
+TEST_F(ReservationTrackerTest, BasicTwoLevel) {
+  const int64_t limit = 16;
+  root_.InitRootTracker(NULL, limit);
+
+  const int64_t root_reservation = limit / 2;
+  // Get half of the limit as an initial reservation.
+  ASSERT_TRUE(root_.IncreaseReservation(root_reservation));
+  ASSERT_EQ(root_reservation, root_.GetReservation());
+  ASSERT_EQ(root_reservation, root_.GetUnusedReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(0, root_.GetChildReservations());
+
+  ReservationTracker child;
+  child.InitChildTracker(NULL, &root_, NULL, numeric_limits<int64_t>::max());
+
+  const int64_t child_reservation = root_reservation + 1;
+  // Get parent's reservation plus a bit more.
+  ASSERT_TRUE(child.IncreaseReservation(child_reservation));
+
+  ASSERT_EQ(child_reservation, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUnusedReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(child_reservation, root_.GetChildReservations());
+
+  ASSERT_EQ(child_reservation, child.GetReservation());
+  ASSERT_EQ(child_reservation, child.GetUnusedReservation());
+  ASSERT_EQ(0, child.GetUsedReservation());
+  ASSERT_EQ(0, child.GetChildReservations());
+
+  // Check that child allocation is reflected correctly.
+  child.AllocateFrom(1);
+  ASSERT_EQ(child_reservation, child.GetReservation());
+  ASSERT_EQ(1, child.GetUsedReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+
+  // Check that parent reservation increase is reflected correctly.
+  ASSERT_TRUE(root_.IncreaseReservation(1));
+  ASSERT_EQ(child_reservation + 1, root_.GetReservation());
+  ASSERT_EQ(1, root_.GetUnusedReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(child_reservation, root_.GetChildReservations());
+  ASSERT_EQ(child_reservation, child.GetReservation());
+
+  // Check that parent allocation is reflected correctly.
+  root_.AllocateFrom(1);
+  ASSERT_EQ(child_reservation + 1, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUnusedReservation());
+  ASSERT_EQ(1, root_.GetUsedReservation());
+
+  // Release allocations.
+  root_.ReleaseTo(1);
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  child.ReleaseTo(1);
+  ASSERT_EQ(0, child.GetUsedReservation());
+
+  // Child reservation should be returned all the way up the tree.
+  child.DecreaseReservation(1);
+  ASSERT_EQ(child_reservation, root_.GetReservation());
+  ASSERT_EQ(child_reservation - 1, child.GetReservation());
+  ASSERT_EQ(child_reservation - 1, root_.GetChildReservations());
+
+  // Closing the child should release its reservation.
+  child.Close();
+  ASSERT_EQ(1, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetChildReservations());
+}
+
+TEST_F(ReservationTrackerTest, CloseIdempotency) {
+  // Check we can close before opening.
+  root_.Close();
+
+  const int64_t limit = 16;
+  root_.InitRootTracker(NULL, limit);
+
+  // Check we can close twice
+  root_.Close();
+  root_.Close();
+}
+
+// Test that the tracker's reservation limit is enforced.
+TEST_F(ReservationTrackerTest, ReservationLimit) {
+  Status status;
+  // Setup trackers so that there is a spare buffer that the client isn't 
entitled to.
+  int64_t total_mem = MIN_BUFFER_LEN * 3;
+  int64_t client_limit = MIN_BUFFER_LEN * 2;
+  root_.InitRootTracker(NULL, total_mem);
+
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(NULL, &root_, NULL, client_limit);
+
+  // We can only increase reservation up to the client's limit.
+  ASSERT_FALSE(client_tracker->IncreaseReservation(client_limit + 1));
+  ASSERT_TRUE(client_tracker->IncreaseReservation(client_limit));
+  ASSERT_FALSE(client_tracker->IncreaseReservation(1));
+
+  client_tracker->Close();
+}
+
+// Test that parent's reservation limit is enforced.
+TEST_F(ReservationTrackerTest, ParentReservationLimit) {
+  Status status;
+  // Setup reservations so that there is a spare buffer.
+  int64_t total_mem = MIN_BUFFER_LEN * 4;
+  int64_t parent_limit = MIN_BUFFER_LEN * 3;
+  int64_t other_client_reservation = MIN_BUFFER_LEN;
+  root_.InitRootTracker(NULL, total_mem);
+
+  // The child reservation limit is higher than the parent reservation limit, 
so the
+  // parent limit is the effective limit.
+  ReservationTracker* query_tracker = obj_pool_.Add(new ReservationTracker());
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  query_tracker->InitChildTracker(NULL, &root_, NULL, parent_limit);
+  client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem * 10);
+
+  ReservationTracker* other_client_tracker = obj_pool_.Add(new 
ReservationTracker());
+  other_client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem);
+  
ASSERT_TRUE(other_client_tracker->IncreaseReservationToFit(other_client_reservation));
+  ASSERT_EQ(root_.GetUsedReservation(), 0);
+  ASSERT_EQ(root_.GetChildReservations(), other_client_reservation);
+  ASSERT_EQ(query_tracker->GetUsedReservation(), 0);
+  ASSERT_EQ(query_tracker->GetUnusedReservation(), 0);
+
+  // Can only increase reservation up to parent limit, excluding other 
reservations.
+  int64_t effective_limit = parent_limit - other_client_reservation;
+  ASSERT_FALSE(client_tracker->IncreaseReservation(effective_limit + 
MIN_BUFFER_LEN));
+  ASSERT_TRUE(client_tracker->IncreaseReservation(effective_limit));
+  ASSERT_FALSE(client_tracker->IncreaseReservation(MIN_BUFFER_LEN));
+
+  // Check that tracker hierarchy reports correct usage.
+  ASSERT_EQ(root_.GetUsedReservation(), 0);
+  ASSERT_EQ(root_.GetChildReservations(), parent_limit);
+  ASSERT_EQ(query_tracker->GetUsedReservation(), 0);
+  ASSERT_EQ(query_tracker->GetUnusedReservation(), 0);
+
+  client_tracker->Close();
+  other_client_tracker->Close();
+  query_tracker->Close();
+}
+
+/// Test integration of ReservationTracker with MemTracker.
+TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) {
+  // Setup a 2-level hierarchy of trackers. The child ReservationTracker is 
linked to
+  // the child MemTracker. We add various limits at different places to enable 
testing
+  // of different code paths.
+  root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100);
+  MemTracker root_mem_tracker;
+  MemTracker child_mem_tracker1(-1, "Child 1", &root_mem_tracker);
+  MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, "Child 2", 
&root_mem_tracker);
+  ReservationTracker child_reservations1, child_reservations2;
+  child_reservations1.InitChildTracker(
+      NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN);
+  child_reservations2.InitChildTracker(
+      NewProfile(), &root_, &child_mem_tracker2, 75 * MIN_BUFFER_LEN);
+
+  // Check that a single buffer reservation is accounted correctly.
+  ASSERT_TRUE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that a buffer reservation from the other child is accounted 
correctly.
+  ASSERT_TRUE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that hitting the MemTracker limit leaves things in a consistent 
state.
+  ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 50));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that hitting the ReservationTracker's local limit leaves things in a
+  // consistent state.
+  ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 75));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that hitting the ReservationTracker's parent's limit after the
+  // MemTracker consumption is incremented leaves things in a consistent state.
+  ASSERT_FALSE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN * 100));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that released memory is decremented from all trackers correctly.
+  child_reservations1.DecreaseReservation(MIN_BUFFER_LEN);
+  child_reservations2.DecreaseReservation(MIN_BUFFER_LEN);
+  ASSERT_EQ(0, child_reservations2.GetReservation());
+  ASSERT_EQ(0, child_mem_tracker2.consumption());
+  ASSERT_EQ(0, root_mem_tracker.consumption());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+
+  child_reservations1.Close();
+  child_reservations2.Close();
+  child_mem_tracker1.UnregisterFromParent();
+  child_mem_tracker2.UnregisterFromParent();
+}
+
+TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
+  const int HIERARCHY_DEPTH = 5;
+  // Setup a multi-level hierarchy of trackers and ensure that consumption is 
reported
+  // correctly.
+  ReservationTracker reservations[HIERARCHY_DEPTH];
+  scoped_ptr<MemTracker> mem_trackers[HIERARCHY_DEPTH];
+
+  // We can only handle MemTracker limits at the topmost linked 
ReservationTracker,
+  // so avoid adding limits at lower level.
+  const int LIMIT = HIERARCHY_DEPTH;
+  vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1});
+
+  // Root trackers aren't linked.
+  mem_trackers[0].reset(new MemTracker(mem_limits[0]));
+  reservations[0].InitRootTracker(NewProfile(), 500);
+  for (int i = 1; i < HIERARCHY_DEPTH; ++i) {
+    mem_trackers[i].reset(new MemTracker(
+        mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
+    reservations[i].InitChildTracker(
+        NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500);
+  }
+
+  vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
+
+  // Test that all limits and consumption correctly reported when consuming
+  // from a non-root ReservationTracker that is connected to a MemTracker.
+  for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
+    int64_t lowest_limit = mem_trackers[level]->lowest_limit();
+    for (int amount : interesting_amounts) {
+      bool increased = reservations[level].IncreaseReservation(amount);
+      if (lowest_limit == -1 || amount <= lowest_limit) {
+        // The increase should go through.
+        ASSERT_TRUE(increased) << reservations[level].DebugString();
+        ASSERT_EQ(amount, reservations[level].GetReservation());
+        ASSERT_EQ(amount, mem_trackers[level]->consumption());
+        for (int ancestor = 0; ancestor < level; ++ancestor) {
+          ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+          ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+        }
+
+        LOG(INFO) << "\n" << mem_trackers[0]->LogUsage();
+        reservations[level].DecreaseReservation(amount);
+      } else {
+        ASSERT_FALSE(increased);
+      }
+      // We should be back in the original state.
+      for (int i = 0; i < HIERARCHY_DEPTH; ++i) {
+        ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
+                                                       << 
reservations[i].DebugString();
+        ASSERT_EQ(0, reservations[i].GetChildReservations());
+        ASSERT_EQ(0, mem_trackers[i]->consumption());
+      }
+    }
+  }
+
+  // "Pull down" a reservation from the top of the hierarchy level-by-level to 
the
+  // leaves, checking that everything is consistent at each step.
+  for (int level = 0; level < HIERARCHY_DEPTH; ++level) {
+    const int amount = LIMIT;
+    ASSERT_TRUE(reservations[level].IncreaseReservation(amount));
+    ASSERT_EQ(amount, reservations[level].GetReservation());
+    ASSERT_EQ(0, reservations[level].GetUsedReservation());
+    if (level != 0) ASSERT_EQ(amount, mem_trackers[level]->consumption());
+    for (int ancestor = 0; ancestor < level; ++ancestor) {
+      ASSERT_EQ(0, reservations[ancestor].GetUsedReservation());
+      ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+      ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+    }
+    reservations[level].DecreaseReservation(amount);
+  }
+
+  for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) {
+    reservations[i].Close();
+    if (i != 0) mem_trackers[i]->UnregisterFromParent();
+  }
+}
+}
+
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc 
b/be/src/runtime/bufferpool/reservation-tracker.cc
new file mode 100644
index 0000000..7138d69
--- /dev/null
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/bufferpool/reservation-tracker.h"
+
+#include <algorithm>
+
+#include "common/object-pool.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/mem-tracker.h"
+#include "util/dummy-runtime-profile.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+ReservationTracker::ReservationTracker() : initialized_(false), 
mem_tracker_(NULL) {}
+
+ReservationTracker::~ReservationTracker() {
+  DCHECK(!initialized_);
+}
+
+void ReservationTracker::InitRootTracker(
+    RuntimeProfile* profile, int64_t reservation_limit) {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(!initialized_);
+  parent_ = NULL;
+  mem_tracker_ = NULL;
+  reservation_limit_ = reservation_limit;
+  reservation_ = 0;
+  used_reservation_ = 0;
+  child_reservations_ = 0;
+  initialized_ = true;
+
+  InitCounters(profile, reservation_limit_);
+  COUNTER_SET(counters_.peak_reservation, reservation_);
+
+  CheckConsistency();
+}
+
+void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
+    ReservationTracker* parent, MemTracker* mem_tracker, int64_t 
reservation_limit) {
+  DCHECK(parent != NULL);
+  DCHECK_GE(reservation_limit, 0);
+
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(!initialized_);
+  parent_ = parent;
+  mem_tracker_ = mem_tracker;
+
+  reservation_limit_ = reservation_limit;
+  reservation_ = 0;
+  used_reservation_ = 0;
+  child_reservations_ = 0;
+  initialized_ = true;
+
+  if (mem_tracker_ != NULL) {
+    MemTracker* parent_mem_tracker = GetParentMemTracker();
+    if (parent_mem_tracker != NULL) {
+      // Make sure the parent links of the MemTrackers correspond to our 
parent links.
+      DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
+      // Make sure we don't have a lower limit than the ancestor, since we 
don't enforce
+      // limits at lower links.
+      DCHECK_EQ(mem_tracker_->lowest_limit(), 
parent_mem_tracker->lowest_limit());
+    } else {
+      // Make sure we didn't leave a gap in the links. E.g. this tracker's 
grandparent
+      // shouldn't have a MemTracker.
+      ReservationTracker* ancestor = parent_;
+      while (ancestor != NULL) {
+        DCHECK(ancestor->mem_tracker_ == NULL);
+        ancestor = ancestor->parent_;
+      }
+    }
+  }
+
+  InitCounters(profile, reservation_limit_);
+
+  CheckConsistency();
+}
+
+void ReservationTracker::InitCounters(
+    RuntimeProfile* profile, int64_t reservation_limit) {
+  bool profile_provided = profile != NULL;
+  if (profile == NULL) {
+    dummy_profile_.reset(new DummyProfile);
+    profile = dummy_profile_->profile();
+  }
+
+  // Check that another tracker's counters aren't already registered in the 
profile.
+  DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL);
+  counters_.reservation_limit =
+      ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES);
+  counters_.peak_reservation =
+      profile->AddHighWaterMarkCounter("BufferPoolPeakReservation", 
TUnit::BYTES);
+  counters_.peak_used_reservation =
+      profile->AddHighWaterMarkCounter("BufferPoolPeakUsedReservation", 
TUnit::BYTES);
+
+  COUNTER_SET(counters_.reservation_limit, reservation_limit);
+
+  if (mem_tracker_ != NULL && profile_provided) {
+    mem_tracker_->EnableReservationReporting(counters_);
+  }
+}
+
+void ReservationTracker::Close() {
+  lock_guard<SpinLock> l(lock_);
+  if (!initialized_) return;
+  CheckConsistency();
+  DCHECK_EQ(used_reservation_, 0);
+  DCHECK_EQ(child_reservations_, 0);
+  // Release any reservation to parent.
+  if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false);
+  mem_tracker_ = NULL;
+  parent_ = NULL;
+  initialized_ = false;
+}
+
+bool ReservationTracker::IncreaseReservation(int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  return IncreaseReservationInternalLocked(bytes, false, false);
+}
+
+bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  return IncreaseReservationInternalLocked(bytes, true, false);
+}
+
+bool ReservationTracker::IncreaseReservationInternalLocked(
+    int64_t bytes, bool use_existing_reservation, bool is_child_reservation) {
+  DCHECK(initialized_);
+  int64_t reservation_increase =
+      use_existing_reservation ? max<int64_t>(0, bytes - unused_reservation()) 
: bytes;
+  DCHECK_GE(reservation_increase, 0);
+
+  bool granted;
+  // Check if the increase is allowed, starting at the bottom of hierarchy.
+  if (reservation_ + reservation_increase > reservation_limit_) {
+    granted = false;
+  } else if (reservation_increase == 0) {
+    granted = true;
+  } else {
+    if (parent_ == NULL) {
+      granted = true;
+    } else {
+      lock_guard<SpinLock> l(parent_->lock_);
+      granted =
+          parent_->IncreaseReservationInternalLocked(reservation_increase, 
true, true);
+    }
+    if (granted && !TryUpdateMemTracker(reservation_increase)) {
+      granted = false;
+      // Roll back changes to ancestors if MemTracker update fails.
+      parent_->DecreaseReservationInternal(reservation_increase, true);
+    }
+  }
+
+  if (granted) {
+    // The reservation was granted and state updated in all ancestors: we can 
modify
+    // this tracker's state now.
+    UpdateReservation(reservation_increase);
+    if (is_child_reservation) child_reservations_ += bytes;
+  }
+
+  CheckConsistency();
+  return granted;
+}
+
+bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) {
+  if (mem_tracker_ == NULL) return true;
+  if (GetParentMemTracker() == NULL) {
+    // At the topmost link, which may be a MemTracker with a limit, we need to 
use
+    // TryConsume() to check the limit.
+    return mem_tracker_->TryConsume(reservation_increase);
+  } else {
+    // For lower links, there shouldn't be a limit to enforce, so we just need 
to
+    // update the consumption of the linked MemTracker since the reservation is
+    // already reflected in its parent.
+    mem_tracker_->ConsumeLocal(reservation_increase, GetParentMemTracker());
+    return true;
+  }
+}
+
+void ReservationTracker::DecreaseReservation(int64_t bytes) {
+  DecreaseReservationInternal(bytes, false);
+}
+
+void ReservationTracker::DecreaseReservationInternal(
+    int64_t bytes, bool is_child_reservation) {
+  lock_guard<SpinLock> l(lock_);
+  DecreaseReservationInternalLocked(bytes, is_child_reservation);
+}
+
+void ReservationTracker::DecreaseReservationInternalLocked(
+    int64_t bytes, bool is_child_reservation) {
+  DCHECK(initialized_);
+  DCHECK_GE(reservation_, bytes);
+  if (bytes == 0) return;
+  if (is_child_reservation) child_reservations_ -= bytes;
+  UpdateReservation(-bytes);
+  // The reservation should be returned up the tree.
+  if (mem_tracker_ != NULL) {
+    if (GetParentMemTracker() == NULL) {
+      mem_tracker_->Release(bytes);
+    } else {
+      mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker());
+    }
+  }
+  if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true);
+  CheckConsistency();
+}
+
+void ReservationTracker::AllocateFrom(int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  DCHECK_GE(bytes, 0);
+  DCHECK_LE(bytes, unused_reservation());
+  UpdateUsedReservation(bytes);
+  CheckConsistency();
+}
+
+void ReservationTracker::ReleaseTo(int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  DCHECK_GE(bytes, 0);
+  DCHECK_LE(bytes, used_reservation_);
+  UpdateUsedReservation(-bytes);
+  CheckConsistency();
+}
+
+int64_t ReservationTracker::GetReservation() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  return reservation_;
+}
+
+int64_t ReservationTracker::GetUsedReservation() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  return used_reservation_;
+}
+
+int64_t ReservationTracker::GetUnusedReservation() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  return unused_reservation();
+}
+
+int64_t ReservationTracker::GetChildReservations() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  return child_reservations_;
+}
+
+void ReservationTracker::CheckConsistency() const {
+  // Check internal invariants.
+  DCHECK_GE(reservation_, 0);
+  DCHECK_LE(reservation_, reservation_limit_);
+  DCHECK_GE(child_reservations_, 0);
+  DCHECK_GE(used_reservation_, 0);
+  DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
+
+  DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
+  DCHECK_LE(reservation_, counters_.peak_reservation->value());
+  DCHECK_EQ(used_reservation_, 
counters_.peak_used_reservation->current_value());
+  DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
+  DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
+}
+
+void ReservationTracker::UpdateUsedReservation(int64_t delta) {
+  used_reservation_ += delta;
+  COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
+  CheckConsistency();
+}
+
+void ReservationTracker::UpdateReservation(int64_t delta) {
+  reservation_ += delta;
+  COUNTER_SET(counters_.peak_reservation, reservation_);
+  CheckConsistency();
+}
+
+string ReservationTracker::DebugString() {
+  lock_guard<SpinLock> l(lock_);
+  if (!initialized_) return "<ReservationTracker>: uninitialized";
+
+  string parent_debug_string = parent_ == NULL ? "NULL" : 
parent_->DebugString();
+  return Substitute(
+      "<ReservationTracker>: reservation_limit $0 reservation $1 
used_reservation $2 "
+      "child_reservations $3 parent:\n$4",
+      reservation_limit_, reservation_, used_reservation_, child_reservations_,
+      parent_debug_string);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h 
b/be/src/runtime/bufferpool/reservation-tracker.h
new file mode 100644
index 0000000..c786f80
--- /dev/null
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_RESERVATION_TRACKER_H
+#define IMPALA_RUNTIME_RESERVATION_TRACKER_H
+
+#include <stdint.h>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/locks.hpp>
+#include <string>
+
+#include "runtime/bufferpool/reservation-tracker-counters.h"
+#include "common/status.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class DummyProfile;
+class MemTracker;
+class RuntimeProfile;
+
+/// A tracker for a hierarchy of buffer pool memory reservations, denominated 
in bytes.
+/// A hierarchy of ReservationTrackers provides a mechanism for subdividing 
buffer pool
+/// memory and enforcing upper and lower bounds on memory usage.
+///
+/// The root of the tracker tree enforces a global maximum, which is 
distributed among its
+/// children. Each tracker in the tree has a 'reservation': the total bytes of 
buffer pool
+/// memory it is entitled to use. The reservation is inclusive of any memory 
that is
+/// already allocated from the reservation, i.e. using a reservation to 
allocate memory
+/// does not subtract from the reservation.
+///
+/// A reservation can be used directly at the tracker by calling 
AllocateFrom(), or
+/// distributed to children of the tracker for the childrens' reservations. 
Each tracker
+/// in the tree can use up to its reservation without checking parent 
trackers. To
+/// increase its reservation, a tracker must use some of its parent's 
reservation (and
+/// perhaps increase reservations all the way to the root of the tree).
+///
+/// Each tracker also has a maximum reservation that is enforced. E.g. if the 
root of the
+/// tracker hierarchy is the global tracker for the Impala daemon and the next 
level of
+/// the hierarchy is made up of per-query trackers, then the maximum 
reservation
+/// mechanism can enforce both process-level and query-level limits on 
reservations.
+///
+/// Invariants:
+/// * A tracker's reservation is at most its reservation limit: reservation <= 
limit
+/// * A tracker's reservation is at least the sum of its childrens' 
reservations plus
+///   the amount of the reservation used directly at this tracker. The 
difference is
+///   the unused reservation:
+///     child_reservations + used_reservation + unused_reservation = 
reservation.
+///
+/// Thread-safety:
+/// All public ReservationTracker methods are thread-safe. If multiple threads
+/// concurrently invoke methods on a ReservationTracker, each operation is 
applied
+/// atomically to leave the ReservationTracker in a consistent state. Calling 
threads
+/// are responsible for coordinating to avoid violating any method 
preconditions,
+/// e.g. ensuring that there is sufficient unused reservation before calling 
AllocateTo().
+///
+/// Integration with MemTracker hierarchy:
+/// TODO: we will remove MemTracker and this integration once all memory is 
accounted via
+/// reservations.
+///
+/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an 
exec
+/// node's ReservationTracker can be linked with the exec node's MemTracker, 
so that
+/// reservations are included in query memory consumption for the purposes of 
enforcing
+/// memory limits, reporting and logging. The reservation is accounted as 
consumption
+/// against the linked MemTracker and its ancestors because reserved memory is 
committed.
+/// Allocating from a reservation therefore does not change the consumption 
reflected in
+/// the MemTracker hierarchy.
+///
+/// MemTracker limits are only checked via the topmost link (i.e. the 
query-level
+/// trackers): we require that no MemTrackers below this level have limits.
+///
+/// We require that the MemTracker hierarchy is consistent with the 
ReservationTracker
+/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and 
its parent
+/// is linked to a MemTracker "B", then "B" must be the parent of "A"'.
+class ReservationTracker {
+ public:
+  ReservationTracker();
+  virtual ~ReservationTracker();
+
+  /// Initializes the root tracker with the given reservation limit in bytes. 
The initial
+  /// reservation is 0.
+  /// if 'profile' is not NULL, the counters defined in 
ReservationTrackerCounters are
+  /// added to 'profile'.
+  void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit);
+
+  /// Initializes a new ReservationTracker with a parent.
+  /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker 
and its
+  /// children will be counted as consumption against 'mem_tracker'.
+  /// 'reservation_limit' is the maximum reservation for this tracker in bytes.
+  /// if 'profile' is not NULL, the counters in 'counters_' are added to 
'profile'.
+  void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent,
+      MemTracker* mem_tracker, int64_t reservation_limit);
+
+  /// If the tracker is initialized, deregister the ReservationTracker from 
its parent,
+  /// relinquishing all this tracker's reservation. All of the reservation 
must be unused
+  /// and all the tracker's children must be closed before calling this method.
+  void Close();
+
+  /// Request to increase reservation by 'bytes'. The request is either 
granted in
+  /// full or not at all. Uses any unused reservation on ancestors and increase
+  /// ancestors' reservations if needed to fit the increased reservation.
+  /// Returns true if the reservation increase is granted, or false if not 
granted.
+  /// If the reservation is not granted, no modifications are made to the 
state of
+  /// any ReservationTrackers.
+  bool IncreaseReservation(int64_t bytes);
+
+  /// Tries to ensure that 'bytes' of unused reservation is available. If not 
already
+  /// available, tries to increase the reservation such that the unused 
reservation is
+  /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and 
increase
+  /// ancestors' reservations if needed to fit the increased reservation.
+  /// Returns true if the reservation increase was successful or not necessary.
+  bool IncreaseReservationToFit(int64_t bytes);
+
+  /// Decrease tracker's reservation by 'bytes'. This tracker's reservation 
must be at
+  /// least 'bytes' before calling this method.
+  /// TODO: decide on and implement policy for how far to release the 
reservation up
+  /// the tree. Currently the reservation is released all the way to the root.
+  void DecreaseReservation(int64_t bytes);
+
+  /// Allocate 'bytes' from the reservation. The tracker must have at least 
'bytes'
+  /// unused reservation before calling this method.
+  void AllocateFrom(int64_t bytes);
+
+  /// Release 'bytes' of previously allocated memory. The used reservation is
+  /// decreased by 'bytes'. Before the call, the used reservation must be at 
least
+  /// 'bytes' before calling this method.
+  void ReleaseTo(int64_t bytes);
+
+  /// Returns the amount of the reservation in bytes.
+  int64_t GetReservation();
+
+  /// Returns the current amount of the reservation used at this tracker, not 
including
+  /// reservations of children in bytes.
+  int64_t GetUsedReservation();
+
+  /// Returns the amount of the reservation neither used nor given to 
childrens'
+  /// reservations at this tracker in bytes.
+  int64_t GetUnusedReservation();
+
+  /// Returns the total reservations of children in bytes.
+  int64_t GetChildReservations();
+
+  std::string DebugString();
+
+ private:
+  /// Returns the amount of 'reservation_' that is unused.
+  inline int64_t unused_reservation() const {
+    return reservation_ - used_reservation_ - child_reservations_;
+  }
+
+  /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL 
otherwise.
+  MemTracker* GetParentMemTracker() const {
+    return parent_ == NULL ? NULL : parent_->mem_tracker_;
+  }
+
+  /// Initializes 'counters_', storing the counters in 'profile'.
+  /// If 'profile' is NULL, creates a dummy profile to store the counters.
+  void InitCounters(RuntimeProfile* profile, int64_t max_reservation);
+
+  /// Internal helper for IncreaseReservation(). If 'use_existing_reservation' 
is true,
+  /// increase by the minimum amount so that 'bytes' fits in the reservation, 
otherwise
+  /// just increase by 'bytes'. If 'is_child_reservation' is true, also 
increase
+  /// 'child_reservations_' by 'bytes'.
+  /// 'lock_' must be held by caller.
+  bool IncreaseReservationInternalLocked(
+      int64_t bytes, bool use_existing_reservation, bool is_child_reservation);
+
+  /// Update consumption on linked MemTracker. For the topmost link, return 
false if
+  /// this failed because it would exceed a memory limit. If there is no linked
+  /// MemTracker, just returns true.
+  /// TODO: remove once we account all memory via ReservationTrackers.
+  bool TryUpdateMemTracker(int64_t reservation_increase);
+
+  /// Internal helper for DecreaseReservation(). This behaves the same as
+  /// DecreaseReservation(), except when 'is_child_reservation' is true it 
decreases
+  /// 'child_reservations_' by 'bytes'.
+  void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation);
+
+  /// Same as DecreaseReservationInternal(), but 'lock_' must be held by 
caller.
+  void DecreaseReservationInternalLocked(int64_t bytes, bool 
is_child_reservation);
+
+  /// Check the internal consistency of the ReservationTracker and DCHECKs if 
in an
+  /// inconsistent state.
+  /// 'lock_' must be held by caller.
+  void CheckConsistency() const;
+
+  /// Increase or decrease 'used_reservation_' and update profile counters 
accordingly.
+  /// 'lock_' must be held by caller.
+  void UpdateUsedReservation(int64_t delta);
+
+  /// Increase or decrease 'reservation_' and update profile counters 
accordingly.
+  /// 'lock_' must be held by caller.
+  void UpdateReservation(int64_t delta);
+
+  /// lock_ protects all members. In a hierarchy of trackers, locks must be 
acquired
+  /// from the bottom-up.
+  SpinLock lock_;
+
+  /// True if the tracker is initialized.
+  bool initialized_;
+
+  /// A dummy profile to hold the counters in 'counters_' in the case that no 
profile
+  /// is provided.
+  boost::scoped_ptr<DummyProfile> dummy_profile_;
+
+  /// The RuntimeProfile counters for this tracker.
+  /// All non-NULL if 'initialized_' is true.
+  ReservationTrackerCounters counters_;
+
+  /// The parent of this tracker in the hierarchy. Does not change after 
initialization.
+  ReservationTracker* parent_;
+
+  /// If non-NULL, reservations are counted as memory consumption against this 
tracker.
+  /// Does not change after initialization. Not owned.
+  /// TODO: remove once all memory is accounted via ReservationTrackers.
+  MemTracker* mem_tracker_;
+
+  /// The maximum reservation in bytes that this tracker can have.
+  int64_t reservation_limit_;
+
+  /// This tracker's current reservation in bytes. 'reservation_' <= 
'reservation_limit_'.
+  int64_t reservation_;
+
+  /// Total reservation of children in bytes. This is included in 
'reservation_'.
+  /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
+  int64_t child_reservations_;
+
+  /// The amount of the reservation currently used by this tracker in bytes.
+  /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
+  int64_t used_reservation_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index a9de160..dc855da 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -22,7 +22,7 @@
 #include <gperftools/malloc_extension.h>
 #include <gutil/strings/substitute.h>
 
-#include "bufferpool/reservation-tracker-counters.h"
+#include "runtime/bufferpool/reservation-tracker-counters.h"
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"

Reply via email to