http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker.cc 
b/be/src/bufferpool/reservation-tracker.cc
deleted file mode 100644
index c5ed086..0000000
--- a/be/src/bufferpool/reservation-tracker.cc
+++ /dev/null
@@ -1,306 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "bufferpool/reservation-tracker.h"
-
-#include <algorithm>
-
-#include "common/object-pool.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/mem-tracker.h"
-#include "util/dummy-runtime-profile.h"
-#include "util/runtime-profile-counters.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-ReservationTracker::ReservationTracker() : initialized_(false), 
mem_tracker_(NULL) {}
-
-ReservationTracker::~ReservationTracker() {
-  DCHECK(!initialized_);
-}
-
-void ReservationTracker::InitRootTracker(
-    RuntimeProfile* profile, int64_t reservation_limit) {
-  lock_guard<SpinLock> l(lock_);
-  DCHECK(!initialized_);
-  parent_ = NULL;
-  mem_tracker_ = NULL;
-  reservation_limit_ = reservation_limit;
-  reservation_ = 0;
-  used_reservation_ = 0;
-  child_reservations_ = 0;
-  initialized_ = true;
-
-  InitCounters(profile, reservation_limit_);
-  COUNTER_SET(counters_.peak_reservation, reservation_);
-
-  CheckConsistency();
-}
-
-void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
-    ReservationTracker* parent, MemTracker* mem_tracker, int64_t 
reservation_limit) {
-  DCHECK(parent != NULL);
-  DCHECK_GE(reservation_limit, 0);
-
-  lock_guard<SpinLock> l(lock_);
-  DCHECK(!initialized_);
-  parent_ = parent;
-  mem_tracker_ = mem_tracker;
-
-  reservation_limit_ = reservation_limit;
-  reservation_ = 0;
-  used_reservation_ = 0;
-  child_reservations_ = 0;
-  initialized_ = true;
-
-  if (mem_tracker_ != NULL) {
-    MemTracker* parent_mem_tracker = GetParentMemTracker();
-    if (parent_mem_tracker != NULL) {
-      // Make sure the parent links of the MemTrackers correspond to our 
parent links.
-      DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
-      // Make sure we don't have a lower limit than the ancestor, since we 
don't enforce
-      // limits at lower links.
-      DCHECK_EQ(mem_tracker_->lowest_limit(), 
parent_mem_tracker->lowest_limit());
-    } else {
-      // Make sure we didn't leave a gap in the links. E.g. this tracker's 
grandparent
-      // shouldn't have a MemTracker.
-      ReservationTracker* ancestor = parent_;
-      while (ancestor != NULL) {
-        DCHECK(ancestor->mem_tracker_ == NULL);
-        ancestor = ancestor->parent_;
-      }
-    }
-  }
-
-  InitCounters(profile, reservation_limit_);
-
-  CheckConsistency();
-}
-
-void ReservationTracker::InitCounters(
-    RuntimeProfile* profile, int64_t reservation_limit) {
-  bool profile_provided = profile != NULL;
-  if (profile == NULL) {
-    dummy_profile_.reset(new DummyProfile);
-    profile = dummy_profile_->profile();
-  }
-
-  // Check that another tracker's counters aren't already registered in the 
profile.
-  DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL);
-  counters_.reservation_limit =
-      ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES);
-  counters_.peak_reservation =
-      profile->AddHighWaterMarkCounter("BufferPoolPeakReservation", 
TUnit::BYTES);
-  counters_.peak_used_reservation =
-      profile->AddHighWaterMarkCounter("BufferPoolPeakUsedReservation", 
TUnit::BYTES);
-
-  COUNTER_SET(counters_.reservation_limit, reservation_limit);
-
-  if (mem_tracker_ != NULL && profile_provided) {
-    mem_tracker_->EnableReservationReporting(counters_);
-  }
-}
-
-void ReservationTracker::Close() {
-  lock_guard<SpinLock> l(lock_);
-  if (!initialized_) return;
-  CheckConsistency();
-  DCHECK_EQ(used_reservation_, 0);
-  DCHECK_EQ(child_reservations_, 0);
-  // Release any reservation to parent.
-  if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false);
-  mem_tracker_ = NULL;
-  parent_ = NULL;
-  initialized_ = false;
-}
-
-bool ReservationTracker::IncreaseReservation(int64_t bytes) {
-  lock_guard<SpinLock> l(lock_);
-  return IncreaseReservationInternalLocked(bytes, false, false);
-}
-
-bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) {
-  lock_guard<SpinLock> l(lock_);
-  return IncreaseReservationInternalLocked(bytes, true, false);
-}
-
-bool ReservationTracker::IncreaseReservationInternalLocked(
-    int64_t bytes, bool use_existing_reservation, bool is_child_reservation) {
-  DCHECK(initialized_);
-  int64_t reservation_increase =
-      use_existing_reservation ? max<int64_t>(0, bytes - unused_reservation()) 
: bytes;
-  DCHECK_GE(reservation_increase, 0);
-
-  bool granted;
-  // Check if the increase is allowed, starting at the bottom of hierarchy.
-  if (reservation_ + reservation_increase > reservation_limit_) {
-    granted = false;
-  } else if (reservation_increase == 0) {
-    granted = true;
-  } else {
-    if (parent_ == NULL) {
-      granted = true;
-    } else {
-      lock_guard<SpinLock> l(parent_->lock_);
-      granted =
-          parent_->IncreaseReservationInternalLocked(reservation_increase, 
true, true);
-    }
-    if (granted && !TryUpdateMemTracker(reservation_increase)) {
-      granted = false;
-      // Roll back changes to ancestors if MemTracker update fails.
-      parent_->DecreaseReservationInternal(reservation_increase, true);
-    }
-  }
-
-  if (granted) {
-    // The reservation was granted and state updated in all ancestors: we can 
modify
-    // this tracker's state now.
-    UpdateReservation(reservation_increase);
-    if (is_child_reservation) child_reservations_ += bytes;
-  }
-
-  CheckConsistency();
-  return granted;
-}
-
-bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) {
-  if (mem_tracker_ == NULL) return true;
-  if (GetParentMemTracker() == NULL) {
-    // At the topmost link, which may be a MemTracker with a limit, we need to 
use
-    // TryConsume() to check the limit.
-    return mem_tracker_->TryConsume(reservation_increase);
-  } else {
-    // For lower links, there shouldn't be a limit to enforce, so we just need 
to
-    // update the consumption of the linked MemTracker since the reservation is
-    // already reflected in its parent.
-    mem_tracker_->ConsumeLocal(reservation_increase, GetParentMemTracker());
-    return true;
-  }
-}
-
-void ReservationTracker::DecreaseReservation(int64_t bytes) {
-  DecreaseReservationInternal(bytes, false);
-}
-
-void ReservationTracker::DecreaseReservationInternal(
-    int64_t bytes, bool is_child_reservation) {
-  lock_guard<SpinLock> l(lock_);
-  DecreaseReservationInternalLocked(bytes, is_child_reservation);
-}
-
-void ReservationTracker::DecreaseReservationInternalLocked(
-    int64_t bytes, bool is_child_reservation) {
-  DCHECK(initialized_);
-  DCHECK_GE(reservation_, bytes);
-  if (bytes == 0) return;
-  if (is_child_reservation) child_reservations_ -= bytes;
-  UpdateReservation(-bytes);
-  // The reservation should be returned up the tree.
-  if (mem_tracker_ != NULL) {
-    if (GetParentMemTracker() == NULL) {
-      mem_tracker_->Release(bytes);
-    } else {
-      mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker());
-    }
-  }
-  if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true);
-  CheckConsistency();
-}
-
-void ReservationTracker::AllocateFrom(int64_t bytes) {
-  lock_guard<SpinLock> l(lock_);
-  DCHECK(initialized_);
-  DCHECK_GE(bytes, 0);
-  DCHECK_LE(bytes, unused_reservation());
-  UpdateUsedReservation(bytes);
-  CheckConsistency();
-}
-
-void ReservationTracker::ReleaseTo(int64_t bytes) {
-  lock_guard<SpinLock> l(lock_);
-  DCHECK(initialized_);
-  DCHECK_GE(bytes, 0);
-  DCHECK_LE(bytes, used_reservation_);
-  UpdateUsedReservation(-bytes);
-  CheckConsistency();
-}
-
-int64_t ReservationTracker::GetReservation() {
-  lock_guard<SpinLock> l(lock_);
-  DCHECK(initialized_);
-  return reservation_;
-}
-
-int64_t ReservationTracker::GetUsedReservation() {
-  lock_guard<SpinLock> l(lock_);
-  DCHECK(initialized_);
-  return used_reservation_;
-}
-
-int64_t ReservationTracker::GetUnusedReservation() {
-  lock_guard<SpinLock> l(lock_);
-  DCHECK(initialized_);
-  return unused_reservation();
-}
-
-int64_t ReservationTracker::GetChildReservations() {
-  lock_guard<SpinLock> l(lock_);
-  DCHECK(initialized_);
-  return child_reservations_;
-}
-
-void ReservationTracker::CheckConsistency() const {
-  // Check internal invariants.
-  DCHECK_GE(reservation_, 0);
-  DCHECK_LE(reservation_, reservation_limit_);
-  DCHECK_GE(child_reservations_, 0);
-  DCHECK_GE(used_reservation_, 0);
-  DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
-
-  DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
-  DCHECK_LE(reservation_, counters_.peak_reservation->value());
-  DCHECK_EQ(used_reservation_, 
counters_.peak_used_reservation->current_value());
-  DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
-  DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
-}
-
-void ReservationTracker::UpdateUsedReservation(int64_t delta) {
-  used_reservation_ += delta;
-  COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
-  CheckConsistency();
-}
-
-void ReservationTracker::UpdateReservation(int64_t delta) {
-  reservation_ += delta;
-  COUNTER_SET(counters_.peak_reservation, reservation_);
-  CheckConsistency();
-}
-
-string ReservationTracker::DebugString() {
-  lock_guard<SpinLock> l(lock_);
-  if (!initialized_) return "<ReservationTracker>: uninitialized";
-
-  string parent_debug_string = parent_ == NULL ? "NULL" : 
parent_->DebugString();
-  return Substitute(
-      "<ReservationTracker>: reservation_limit $0 reservation $1 
used_reservation $2 "
-      "child_reservations $3 parent:\n$4",
-      reservation_limit_, reservation_, used_reservation_, child_reservations_,
-      parent_debug_string);
-}
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker.h 
b/be/src/bufferpool/reservation-tracker.h
deleted file mode 100644
index 6bdecf0..0000000
--- a/be/src/bufferpool/reservation-tracker.h
+++ /dev/null
@@ -1,248 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RESERVATION_TRACKER_H
-#define IMPALA_RESERVATION_TRACKER_H
-
-#include <stdint.h>
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/locks.hpp>
-#include <string>
-
-#include "bufferpool/reservation-tracker-counters.h"
-#include "common/status.h"
-#include "util/spinlock.h"
-
-namespace impala {
-
-class DummyProfile;
-class MemTracker;
-class RuntimeProfile;
-
-/// A tracker for a hierarchy of buffer pool memory reservations, denominated 
in bytes.
-/// A hierarchy of ReservationTrackers provides a mechanism for subdividing 
buffer pool
-/// memory and enforcing upper and lower bounds on memory usage.
-///
-/// The root of the tracker tree enforces a global maximum, which is 
distributed among its
-/// children. Each tracker in the tree has a 'reservation': the total bytes of 
buffer pool
-/// memory it is entitled to use. The reservation is inclusive of any memory 
that is
-/// already allocated from the reservation, i.e. using a reservation to 
allocate memory
-/// does not subtract from the reservation.
-///
-/// A reservation can be used directly at the tracker by calling 
AllocateFrom(), or
-/// distributed to children of the tracker for the childrens' reservations. 
Each tracker
-/// in the tree can use up to its reservation without checking parent 
trackers. To
-/// increase its reservation, a tracker must use some of its parent's 
reservation (and
-/// perhaps increase reservations all the way to the root of the tree).
-///
-/// Each tracker also has a maximum reservation that is enforced. E.g. if the 
root of the
-/// tracker hierarchy is the global tracker for the Impala daemon and the next 
level of
-/// the hierarchy is made up of per-query trackers, then the maximum 
reservation
-/// mechanism can enforce both process-level and query-level limits on 
reservations.
-///
-/// Invariants:
-/// * A tracker's reservation is at most its reservation limit: reservation <= 
limit
-/// * A tracker's reservation is at least the sum of its childrens' 
reservations plus
-///   the amount of the reservation used directly at this tracker. The 
difference is
-///   the unused reservation:
-///     child_reservations + used_reservation + unused_reservation = 
reservation.
-///
-/// Thread-safety:
-/// All public ReservationTracker methods are thread-safe. If multiple threads
-/// concurrently invoke methods on a ReservationTracker, each operation is 
applied
-/// atomically to leave the ReservationTracker in a consistent state. Calling 
threads
-/// are responsible for coordinating to avoid violating any method 
preconditions,
-/// e.g. ensuring that there is sufficient unused reservation before calling 
AllocateTo().
-///
-/// Integration with MemTracker hierarchy:
-/// TODO: we will remove MemTracker and this integration once all memory is 
accounted via
-/// reservations.
-///
-/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an 
exec
-/// node's ReservationTracker can be linked with the exec node's MemTracker, 
so that
-/// reservations are included in query memory consumption for the purposes of 
enforcing
-/// memory limits, reporting and logging. The reservation is accounted as 
consumption
-/// against the linked MemTracker and its ancestors because reserved memory is 
committed.
-/// Allocating from a reservation therefore does not change the consumption 
reflected in
-/// the MemTracker hierarchy.
-///
-/// MemTracker limits are only checked via the topmost link (i.e. the 
query-level
-/// trackers): we require that no MemTrackers below this level have limits.
-///
-/// We require that the MemTracker hierarchy is consistent with the 
ReservationTracker
-/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and 
its parent
-/// is linked to a MemTracker "B", then "B" must be the parent of "A"'.
-class ReservationTracker {
- public:
-  ReservationTracker();
-  virtual ~ReservationTracker();
-
-  /// Initializes the root tracker with the given reservation limit in bytes. 
The initial
-  /// reservation is 0.
-  /// if 'profile' is not NULL, the counters defined in 
ReservationTrackerCounters are
-  /// added to 'profile'.
-  void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit);
-
-  /// Initializes a new ReservationTracker with a parent.
-  /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker 
and its
-  /// children will be counted as consumption against 'mem_tracker'.
-  /// 'reservation_limit' is the maximum reservation for this tracker in bytes.
-  /// if 'profile' is not NULL, the counters in 'counters_' are added to 
'profile'.
-  void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent,
-      MemTracker* mem_tracker, int64_t reservation_limit);
-
-  /// If the tracker is initialized, deregister the ReservationTracker from 
its parent,
-  /// relinquishing all this tracker's reservation. All of the reservation 
must be unused
-  /// and all the tracker's children must be closed before calling this method.
-  void Close();
-
-  /// Request to increase reservation by 'bytes'. The request is either 
granted in
-  /// full or not at all. Uses any unused reservation on ancestors and increase
-  /// ancestors' reservations if needed to fit the increased reservation.
-  /// Returns true if the reservation increase is granted, or false if not 
granted.
-  /// If the reservation is not granted, no modifications are made to the 
state of
-  /// any ReservationTrackers.
-  bool IncreaseReservation(int64_t bytes);
-
-  /// Tries to ensure that 'bytes' of unused reservation is available. If not 
already
-  /// available, tries to increase the reservation such that the unused 
reservation is
-  /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and 
increase
-  /// ancestors' reservations if needed to fit the increased reservation.
-  /// Returns true if the reservation increase was successful or not necessary.
-  bool IncreaseReservationToFit(int64_t bytes);
-
-  /// Decrease tracker's reservation by 'bytes'. This tracker's reservation 
must be at
-  /// least 'bytes' before calling this method.
-  /// TODO: decide on and implement policy for how far to release the 
reservation up
-  /// the tree. Currently the reservation is released all the way to the root.
-  void DecreaseReservation(int64_t bytes);
-
-  /// Allocate 'bytes' from the reservation. The tracker must have at least 
'bytes'
-  /// unused reservation before calling this method.
-  void AllocateFrom(int64_t bytes);
-
-  /// Release 'bytes' of previously allocated memory. The used reservation is
-  /// decreased by 'bytes'. Before the call, the used reservation must be at 
least
-  /// 'bytes' before calling this method.
-  void ReleaseTo(int64_t bytes);
-
-  /// Returns the amount of the reservation in bytes.
-  int64_t GetReservation();
-
-  /// Returns the current amount of the reservation used at this tracker, not 
including
-  /// reservations of children in bytes.
-  int64_t GetUsedReservation();
-
-  /// Returns the amount of the reservation neither used nor given to 
childrens'
-  /// reservations at this tracker in bytes.
-  int64_t GetUnusedReservation();
-
-  /// Returns the total reservations of children in bytes.
-  int64_t GetChildReservations();
-
-  std::string DebugString();
-
- private:
-  /// Returns the amount of 'reservation_' that is unused.
-  inline int64_t unused_reservation() const {
-    return reservation_ - used_reservation_ - child_reservations_;
-  }
-
-  /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL 
otherwise.
-  MemTracker* GetParentMemTracker() const {
-    return parent_ == NULL ? NULL : parent_->mem_tracker_;
-  }
-
-  /// Initializes 'counters_', storing the counters in 'profile'.
-  /// If 'profile' is NULL, creates a dummy profile to store the counters.
-  void InitCounters(RuntimeProfile* profile, int64_t max_reservation);
-
-  /// Internal helper for IncreaseReservation(). If 'use_existing_reservation' 
is true,
-  /// increase by the minimum amount so that 'bytes' fits in the reservation, 
otherwise
-  /// just increase by 'bytes'. If 'is_child_reservation' is true, also 
increase
-  /// 'child_reservations_' by 'bytes'.
-  /// 'lock_' must be held by caller.
-  bool IncreaseReservationInternalLocked(
-      int64_t bytes, bool use_existing_reservation, bool is_child_reservation);
-
-  /// Update consumption on linked MemTracker. For the topmost link, return 
false if
-  /// this failed because it would exceed a memory limit. If there is no linked
-  /// MemTracker, just returns true.
-  /// TODO: remove once we account all memory via ReservationTrackers.
-  bool TryUpdateMemTracker(int64_t reservation_increase);
-
-  /// Internal helper for DecreaseReservation(). This behaves the same as
-  /// DecreaseReservation(), except when 'is_child_reservation' is true it 
decreases
-  /// 'child_reservations_' by 'bytes'.
-  void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation);
-
-  /// Same as DecreaseReservationInternal(), but 'lock_' must be held by 
caller.
-  void DecreaseReservationInternalLocked(int64_t bytes, bool 
is_child_reservation);
-
-  /// Check the internal consistency of the ReservationTracker and DCHECKs if 
in an
-  /// inconsistent state.
-  /// 'lock_' must be held by caller.
-  void CheckConsistency() const;
-
-  /// Increase or decrease 'used_reservation_' and update profile counters 
accordingly.
-  /// 'lock_' must be held by caller.
-  void UpdateUsedReservation(int64_t delta);
-
-  /// Increase or decrease 'reservation_' and update profile counters 
accordingly.
-  /// 'lock_' must be held by caller.
-  void UpdateReservation(int64_t delta);
-
-  /// lock_ protects all members. In a hierarchy of trackers, locks must be 
acquired
-  /// from the bottom-up.
-  SpinLock lock_;
-
-  /// True if the tracker is initialized.
-  bool initialized_;
-
-  /// A dummy profile to hold the counters in 'counters_' in the case that no 
profile
-  /// is provided.
-  boost::scoped_ptr<DummyProfile> dummy_profile_;
-
-  /// The RuntimeProfile counters for this tracker.
-  /// All non-NULL if 'initialized_' is true.
-  ReservationTrackerCounters counters_;
-
-  /// The parent of this tracker in the hierarchy. Does not change after 
initialization.
-  ReservationTracker* parent_;
-
-  /// If non-NULL, reservations are counted as memory consumption against this 
tracker.
-  /// Does not change after initialization. Not owned.
-  /// TODO: remove once all memory is accounted via ReservationTrackers.
-  MemTracker* mem_tracker_;
-
-  /// The maximum reservation in bytes that this tracker can have.
-  int64_t reservation_limit_;
-
-  /// This tracker's current reservation in bytes. 'reservation_' <= 
'reservation_limit_'.
-  int64_t reservation_;
-
-  /// Total reservation of children in bytes. This is included in 
'reservation_'.
-  /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
-  int64_t child_reservations_;
-
-  /// The amount of the reservation currently used by this tracker in bytes.
-  /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
-  int64_t used_reservation_;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 19db090..54a1347 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+add_subdirectory(bufferpool)
 
 # where to put generated libraries
 set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/CMakeLists.txt 
b/be/src/runtime/bufferpool/CMakeLists.txt
new file mode 100644
index 0000000..758d538
--- /dev/null
+++ b/be/src/runtime/bufferpool/CMakeLists.txt
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/bufferpool")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/bufferpool")
+
+add_library(BufferPool
+  buffer-allocator.cc
+  buffer-pool.cc
+  reservation-tracker.cc
+)
+add_dependencies(BufferPool thrift-deps)
+
+ADD_BE_TEST(buffer-pool-test)
+ADD_BE_TEST(reservation-tracker-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.cc 
b/be/src/runtime/bufferpool/buffer-allocator.cc
new file mode 100644
index 0000000..3befe76
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-allocator.cc
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/bufferpool/buffer-allocator.h"
+
+#include "util/bit-util.h"
+
+namespace impala {
+
+BufferAllocator::BufferAllocator(int64_t min_buffer_len)
+  : min_buffer_len_(min_buffer_len) {}
+
+Status BufferAllocator::Allocate(int64_t len, uint8_t** buffer) {
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+  *buffer = reinterpret_cast<uint8_t*>(malloc(len));
+  if (*buffer == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, 
len);
+  return Status::OK();
+}
+
+void BufferAllocator::Free(uint8_t* buffer, int64_t len) {
+  free(buffer);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h 
b/be/src/runtime/bufferpool/buffer-allocator.h
new file mode 100644
index 0000000..54b667a
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFER_ALLOCATOR_H
+#define IMPALA_RUNTIME_BUFFER_ALLOCATOR_H
+
+#include "common/status.h"
+
+namespace impala {
+
+/// The underlying memory allocator for the buffer pool. All buffers are 
allocated through
+/// the BufferPool's BufferAllocator. The allocator only handles allocating 
buffers that
+/// are power-of-two multiples of the minimum buffer length.
+///
+/// TODO:
+/// * Allocate memory with mmap() instead of malloc().
+/// * Implement free lists in the allocator or external to the allocator.
+class BufferAllocator {
+ public:
+  BufferAllocator(int64_t min_buffer_len);
+
+  /// Allocate memory for a buffer of 'len' bytes. 'len' must be a 
power-of-two multiple
+  /// of the minimum buffer length.
+  Status Allocate(int64_t len, uint8_t** buffer);
+
+  /// Free the memory for a previously-allocated buffer.
+  void Free(uint8_t* buffer, int64_t len);
+
+ private:
+  const int64_t min_buffer_len_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-pool-counters.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-counters.h 
b/be/src/runtime/bufferpool/buffer-pool-counters.h
new file mode 100644
index 0000000..e4b1c36
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-pool-counters.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFER_POOL_COUNTERS_H
+#define IMPALA_RUNTIME_BUFFER_POOL_COUNTERS_H
+
+#include "util/runtime-profile.h"
+
+namespace impala {
+
+/// A set of counters for each buffer pool client.
+struct BufferPoolClientCounters {
+ public:
+  /// Amount of time spent trying to get a buffer.
+  RuntimeProfile::Counter* get_buffer_time;
+
+  /// Amount of time spent waiting for reads from disk to complete.
+  RuntimeProfile::Counter* read_wait_time;
+
+  /// Amount of time spent waiting for writes to disk to complete.
+  RuntimeProfile::Counter* write_wait_time;
+
+  /// The peak total size of unpinned buffers.
+  RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes;
+
+  /// The total bytes of data unpinned. Every time a page's pin count goes 
from 1 to 0,
+  /// this counter is incremented by the page size.
+  RuntimeProfile::Counter* total_unpinned_bytes;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc 
b/be/src/runtime/bufferpool/buffer-pool-test.cc
new file mode 100644
index 0000000..af84bbe
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -0,0 +1,554 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/unordered_map.hpp>
+#include <cstdlib>
+#include <string>
+#include <vector>
+
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "testutil/death-test-util.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+class BufferPoolTest : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  virtual void TearDown() {
+    for (auto entry : query_reservations_) {
+      ReservationTracker* tracker = entry.second;
+      tracker->Close();
+    }
+
+    global_reservations_.Close();
+    obj_pool_.Clear();
+  }
+
+  /// The minimum buffer size used in most tests.
+  const static int64_t TEST_BUFFER_LEN = 1024;
+
+  /// Test helper to simulate registering then deregistering a number of 
queries with
+  /// the given initial reservation and reservation limit.
+  void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int 
num_queries,
+      int64_t initial_query_reservation, int64_t query_reservation_limit);
+
+  /// Create and destroy a page multiple times.
+  void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, 
int num_ops);
+
+ protected:
+  static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 
32 | lo; }
+
+  /// Helper function to create one reservation tracker per query.
+  ReservationTracker* GetQueryReservationTracker(int64_t query_id) {
+    lock_guard<SpinLock> l(query_reservations_lock_);
+    ReservationTracker* tracker = query_reservations_[query_id];
+    if (tracker != NULL) return tracker;
+    tracker = obj_pool_.Add(new ReservationTracker());
+    query_reservations_[query_id] = tracker;
+    return tracker;
+  }
+
+  RuntimeProfile* NewProfile() {
+    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+  }
+
+  ObjectPool obj_pool_;
+
+  ReservationTracker global_reservations_;
+
+  // Map from query_id to the reservation tracker for that query. Reads and 
modifications
+  // of the map are protected by query_reservations_lock_.
+  unordered_map<int64_t, ReservationTracker*> query_reservations_;
+  SpinLock query_reservations_lock_;
+};
+
+const int64_t BufferPoolTest::TEST_BUFFER_LEN;
+
+void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int 
query_id_hi,
+    int num_queries, int64_t initial_query_reservation, int64_t 
query_reservation_limit) {
+  Status status;
+
+  int clients_per_query = 32;
+  BufferPool::Client* clients[num_queries];
+  ReservationTracker* client_reservations[num_queries];
+
+  for (int i = 0; i < num_queries; ++i) {
+    int64_t query_id = QueryId(query_id_hi, i);
+
+    // Initialize a tracker for a new query.
+    ReservationTracker* query_reservation = 
GetQueryReservationTracker(query_id);
+    query_reservation->InitChildTracker(
+        NULL, &global_reservations_, NULL, query_reservation_limit);
+
+    // Test that closing then reopening child tracker works.
+    query_reservation->Close();
+    query_reservation->InitChildTracker(
+        NULL, &global_reservations_, NULL, query_reservation_limit);
+    
EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
+
+    clients[i] = new BufferPool::Client[clients_per_query];
+    client_reservations[i] = new ReservationTracker[clients_per_query];
+
+    for (int j = 0; j < clients_per_query; ++j) {
+      int64_t initial_client_reservation =
+          initial_query_reservation / clients_per_query + j
+          < initial_query_reservation % clients_per_query;
+      // Reservation limit can be anything greater or equal to the initial 
reservation.
+      int64_t client_reservation_limit = initial_client_reservation + rand() % 
100000;
+      client_reservations[i][j].InitChildTracker(
+          NULL, query_reservation, NULL, client_reservation_limit);
+      EXPECT_TRUE(
+          
client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation));
+      string name = Substitute("Client $0 for query $1", j, query_id);
+      EXPECT_OK(pool->RegisterClient(
+          name, &client_reservations[i][j], NewProfile(), &clients[i][j]));
+    }
+
+    for (int j = 0; j < clients_per_query; ++j) {
+      ASSERT_TRUE(clients[i][j].is_registered());
+    }
+  }
+
+  // Deregister clients then query.
+  for (int i = 0; i < num_queries; ++i) {
+    for (int j = 0; j < clients_per_query; ++j) {
+      pool->DeregisterClient(&clients[i][j]);
+      ASSERT_FALSE(clients[i][j].is_registered());
+      client_reservations[i][j].Close();
+    }
+
+    delete[] clients[i];
+    delete[] client_reservations[i];
+
+    GetQueryReservationTracker(QueryId(query_id_hi, i))->Close();
+  }
+}
+
+/// Test that queries and clients can be registered and deregistered with the 
reservation
+/// trackers and the buffer pool.
+TEST_F(BufferPoolTest, BasicRegistration) {
+  int num_concurrent_queries = 1024;
+  int64_t sum_initial_reservations = 4;
+  int64_t reservation_limit = 1024;
+  // Need enough buffers for all initial reservations.
+  int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
+  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+
+  RegisterQueriesAndClients(
+      &pool, 0, num_concurrent_queries, sum_initial_reservations, 
reservation_limit);
+
+  DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
+  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test that queries and clients can be registered and deregistered by 
concurrent
+/// threads.
+TEST_F(BufferPoolTest, ConcurrentRegistration) {
+  int queries_per_thread = 64;
+  int num_threads = 64;
+  int num_concurrent_queries = queries_per_thread * num_threads;
+  int64_t sum_initial_reservations = 4;
+  int64_t reservation_limit = 1024;
+  // Need enough buffers for all initial reservations.
+  int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
+  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+
+  // Launch threads, each with a different set of query IDs.
+  thread_group workers;
+  for (int i = 0; i < num_threads; ++i) {
+    workers.add_thread(new 
thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this,
+        &pool, i, queries_per_thread, sum_initial_reservations, 
reservation_limit)));
+  }
+  workers.join_all();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test basic page handle creation.
+TEST_F(BufferPoolTest, PageCreation) {
+  // Allocate many pages, each a power-of-two multiple of the minimum page 
length.
+  int num_pages = 16;
+  int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
+  int64_t total_mem = 2 * 2 * max_page_len;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, 
total_mem);
+  ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
+
+  vector<BufferPool::PageHandle> handles(num_pages);
+
+  // Create pages of various valid sizes.
+  for (int i = 0; i < num_pages; ++i) {
+    int size_multiple = 1 << i;
+    int64_t page_len = TEST_BUFFER_LEN * size_multiple;
+    int64_t used_before = client_tracker->GetUsedReservation();
+    ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
+    ASSERT_TRUE(handles[i].is_open());
+    ASSERT_TRUE(handles[i].is_pinned());
+    ASSERT_TRUE(handles[i].buffer_handle() != NULL);
+    ASSERT_TRUE(handles[i].data() != NULL);
+    ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
+    ASSERT_EQ(handles[i].len(), page_len);
+    ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
+  }
+
+  // Close the handles and check memory consumption.
+  for (int i = 0; i < num_pages; ++i) {
+    int64_t used_before = client_tracker->GetUsedReservation();
+    int page_len = handles[i].len();
+    pool.DestroyPage(&client, &handles[i]);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
+  }
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+TEST_F(BufferPoolTest, BufferAllocation) {
+  // Allocate many buffers, each a power-of-two multiple of the minimum buffer 
length.
+  int num_buffers = 16;
+  int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
+  int64_t total_mem = 2 * 2 * max_buffer_len;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, 
total_mem);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
+
+  vector<BufferPool::BufferHandle> handles(num_buffers);
+
+  // Create buffers of various valid sizes.
+  for (int i = 0; i < num_buffers; ++i) {
+    int size_multiple = 1 << i;
+    int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
+    int64_t used_before = client_tracker->GetUsedReservation();
+    ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    ASSERT_TRUE(handles[i].is_open());
+    ASSERT_TRUE(handles[i].data() != NULL);
+    ASSERT_EQ(handles[i].len(), buffer_len);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
+  }
+
+  // Close the handles and check memory consumption.
+  for (int i = 0; i < num_buffers; ++i) {
+    int64_t used_before = client_tracker->GetUsedReservation();
+    int buffer_len = handles[i].len();
+    pool.FreeBuffer(&client, &handles[i]);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
+  }
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test transfer of buffer handles between clients.
+TEST_F(BufferPoolTest, BufferTransfer) {
+  // Each client needs to have enough reservation for a buffer.
+  const int num_clients = 5;
+  int64_t total_mem = num_clients * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker client_trackers[num_clients];
+  BufferPool::Client clients[num_clients];
+  BufferPool::BufferHandle handles[num_clients];
+  for (int i = 0; i < num_clients; ++i) {
+    client_trackers[i].InitChildTracker(
+        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+    ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
+    ASSERT_OK(pool.RegisterClient(
+        "test client", &client_trackers[i], NewProfile(), &clients[i]));
+  }
+
+  // Transfer the page around between the clients repeatedly in a circle.
+  ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0]));
+  uint8_t* data = handles[0].data();
+  for (int iter = 0; iter < 10; ++iter) {
+    for (int client = 0; client < num_clients; ++client) {
+      int next_client = (client + 1) % num_clients;
+      ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client],
+          &clients[next_client], &handles[next_client]));
+      // Check that the transfer left things in a consistent state.
+      ASSERT_FALSE(handles[client].is_open());
+      ASSERT_EQ(0, client_trackers[client].GetUsedReservation());
+      ASSERT_TRUE(handles[next_client].is_open());
+      ASSERT_EQ(TEST_BUFFER_LEN, 
client_trackers[next_client].GetUsedReservation());
+      // The same underlying buffer should be used.
+      ASSERT_EQ(data, handles[next_client].data());
+    }
+  }
+
+  pool.FreeBuffer(&clients[0], &handles[0]);
+  for (int i = 0; i < num_clients; ++i) {
+    pool.DeregisterClient(&clients[i]);
+    client_trackers[i].Close();
+  }
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test basic pinning and unpinning.
+TEST_F(BufferPoolTest, Pin) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  // Set up client with enough reservation to pin twice.
+  int64_t child_reservation = TEST_BUFFER_LEN * 2;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, child_reservation);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
+
+  BufferPool::PageHandle handle1, handle2;
+
+  // Can pin two minimum sized pages.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+  ASSERT_TRUE(handle1.is_open());
+  ASSERT_TRUE(handle1.is_pinned());
+  ASSERT_TRUE(handle1.data() != NULL);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+  ASSERT_TRUE(handle2.is_open());
+  ASSERT_TRUE(handle2.is_pinned());
+  ASSERT_TRUE(handle2.data() != NULL);
+
+  pool.Unpin(&client, &handle2);
+  ASSERT_FALSE(handle2.is_pinned());
+
+  // Can pin minimum-sized page twice.
+  ASSERT_OK(pool.Pin(&client, &handle1));
+  ASSERT_TRUE(handle1.is_pinned());
+  // Have to unpin twice.
+  pool.Unpin(&client, &handle1);
+  ASSERT_TRUE(handle1.is_pinned());
+  pool.Unpin(&client, &handle1);
+  ASSERT_FALSE(handle1.is_pinned());
+
+  // Can pin double-sized page only once.
+  BufferPool::PageHandle double_handle;
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle));
+  ASSERT_TRUE(double_handle.is_open());
+  ASSERT_TRUE(double_handle.is_pinned());
+  ASSERT_TRUE(double_handle.data() != NULL);
+
+  // Destroy the pages - test destroying both pinned and unpinned.
+  pool.DestroyPage(&client, &handle1);
+  pool.DestroyPage(&client, &handle2);
+  pool.DestroyPage(&client, &double_handle);
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+/// Creating a page or pinning without sufficient reservation should DCHECK.
+TEST_F(BufferPoolTest, PinWithoutReservation) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
+
+  BufferPool::PageHandle handle;
+  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, 
&handle), "");
+
+  // Should succeed after increasing reservation.
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
+
+  // But we can't pin again.
+  IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), "");
+
+  pool.DestroyPage(&client, &handle);
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+TEST_F(BufferPoolTest, ExtractBuffer) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  // Set up client with enough reservation for two buffers/pins.
+  int64_t child_reservation = TEST_BUFFER_LEN * 2;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, child_reservation);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
+
+  BufferPool::PageHandle page;
+  BufferPool::BufferHandle buffer;
+
+  // Test basic buffer extraction.
+  for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
+    ASSERT_OK(pool.CreatePage(&client, len, &page));
+    uint8_t* page_data = page.data();
+    pool.ExtractBuffer(&client, &page, &buffer);
+    ASSERT_FALSE(page.is_open());
+    ASSERT_TRUE(buffer.is_open());
+    ASSERT_EQ(len, buffer.len());
+    ASSERT_EQ(page_data, buffer.data());
+    ASSERT_EQ(len, client_tracker->GetUsedReservation());
+    pool.FreeBuffer(&client, &buffer);
+    ASSERT_EQ(0, client_tracker->GetUsedReservation());
+  }
+
+  // Test that ExtractBuffer() accounts correctly for pin count > 1.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+  uint8_t* page_data = page.data();
+  ASSERT_OK(pool.Pin(&client, &page));
+  ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation());
+  pool.ExtractBuffer(&client, &page, &buffer);
+  ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation());
+  ASSERT_FALSE(page.is_open());
+  ASSERT_TRUE(buffer.is_open());
+  ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
+  ASSERT_EQ(page_data, buffer.data());
+  pool.FreeBuffer(&client, &buffer);
+  ASSERT_EQ(0, client_tracker->GetUsedReservation());
+
+  // Test that ExtractBuffer() DCHECKs for unpinned pages.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+  pool.Unpin(&client, &page);
+  IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), "");
+  pool.DestroyPage(&client, &page);
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+// Test concurrent creation and destruction of pages.
+TEST_F(BufferPoolTest, ConcurrentPageCreation) {
+  int ops_per_thread = 1024;
+  int num_threads = 64;
+  // Need enough buffers for all initial reservations.
+  int total_mem = num_threads * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+
+  // Launch threads, each with a different set of query IDs.
+  thread_group workers;
+  for (int i = 0; i < num_threads; ++i) {
+    workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, 
&pool,
+        &global_reservations_, ops_per_thread)));
+  }
+
+  // Build debug string to test concurrent iteration over pages_ list.
+  for (int i = 0; i < 64; ++i) {
+    LOG(INFO) << pool.DebugString();
+  }
+  workers.join_all();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
+  global_reservations_.Close();
+}
+
+void BufferPoolTest::CreatePageLoop(
+    BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) {
+  ReservationTracker client_tracker;
+  client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, 
TEST_BUFFER_LEN);
+  BufferPool::Client client;
+  ASSERT_OK(pool->RegisterClient("test client", &client_tracker, NewProfile(), 
&client));
+  for (int i = 0; i < num_ops; ++i) {
+    BufferPool::PageHandle handle;
+    ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
+    ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
+    pool->Unpin(&client, &handle);
+    ASSERT_OK(pool->Pin(&client, &handle));
+    pool->DestroyPage(&client, &handle);
+    client_tracker.DecreaseReservation(TEST_BUFFER_LEN);
+  }
+  pool->DeregisterClient(&client);
+  client_tracker.Close();
+}
+
+/// Test error path where pool is unable to fulfill a reservation because it 
cannot evict
+/// unpinned pages.
+TEST_F(BufferPoolTest, CapacityExhausted) {
+  global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
+  // TODO: once we enable spilling, set up buffer pool so that spilling is 
disabled.
+  // Set up pool with one more buffer than reservations (so that we hit the 
reservation
+  // limit instead of the buffer limit).
+  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2);
+
+  BufferPool::PageHandle handle1, handle2, handle3;
+
+  BufferPool::Client client;
+  ASSERT_OK(
+      pool.RegisterClient("test client", &global_reservations_, NewProfile(), 
&client));
+  ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+
+  // Do not have enough reservations because we pinned the page.
+  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, 
&handle2), "");
+
+  // Even with reservations, we can only create one more unpinned page because 
we don't
+  // support eviction yet.
+  pool.Unpin(&client, &handle1);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+  pool.Unpin(&client, &handle2);
+  ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok());
+
+  // After destroying a page, we should have a free buffer that we can use.
+  pool.DestroyPage(&client, &handle1);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3));
+
+  pool.DestroyPage(&client, &handle2);
+  pool.DestroyPage(&client, &handle3);
+  pool.DeregisterClient(&client);
+}
+}
+
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc 
b/be/src/runtime/bufferpool/buffer-pool.cc
new file mode 100644
index 0000000..1960bb0
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -0,0 +1,439 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/bufferpool/buffer-pool.h"
+
+#include <boost/bind.hpp>
+#include <limits>
+#include <sstream>
+
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "common/names.h"
+#include "gutil/strings/substitute.h"
+#include "util/bit-util.h"
+#include "util/runtime-profile-counters.h"
+#include "util/uid-util.h"
+
+namespace impala {
+
+/// The internal representation of a page, which can be pinned or unpinned. If 
the
+/// page is pinned, a buffer is associated with the page.
+///
+/// Code manipulating the page is responsible for acquiring 'lock' when 
reading or
+/// modifying the page.
+struct BufferPool::Page : public BufferPool::PageList::Node {
+  Page(int64_t len) : len(len), pin_count(0), dirty(false) {}
+
+  /// Increment the pin count. Caller must hold 'lock'.
+  void IncrementPinCount(PageHandle* handle) {
+    lock.DCheckLocked();
+    ++pin_count;
+    // Pinned page buffers may be modified by anyone with a pointer to the 
buffer, so we
+    // have to assume they are dirty.
+    dirty = true;
+  }
+
+  /// Decrement the pin count. Caller must hold 'lock'.
+  void DecrementPinCount(PageHandle* handle) {
+    lock.DCheckLocked();
+    DCHECK(pin_count > 0);
+    --pin_count;
+  }
+
+  string DebugString() {
+    return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 
dirty: $4", this,
+        len, pin_count, buffer.DebugString(), dirty);
+  }
+
+  // Helper for BufferPool::DebugString().
+  static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
+    lock_guard<SpinLock> pl(page->lock);
+    (*ss) << page->DebugString() << "\n";
+    return true;
+  }
+
+  /// The length of the page in bytes.
+  const int64_t len;
+
+  /// Lock to protect the below members of Page. The lock must be held when 
modifying any
+  /// of the below members and when reading any of the below members of an 
unpinned page.
+  SpinLock lock;
+
+  /// The pin count of the page.
+  int pin_count;
+
+  /// Buffer with the page's contents, Always open if pinned. Closed if page 
is unpinned
+  /// and was evicted from memory.
+  BufferHandle buffer;
+
+  /// True if the buffer's contents need to be saved before evicting it from 
memory.
+  bool dirty;
+};
+
+BufferPool::BufferHandle::BufferHandle() {
+  Reset();
+}
+
+BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {
+  *this = std::move(src);
+}
+
+BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& 
src) {
+  DCHECK(!is_open());
+  // Copy over all members then close src.
+  client_ = src.client_;
+  data_ = src.data_;
+  len_ = src.len_;
+  src.Reset();
+  return *this;
+}
+
+void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, 
int64_t len) {
+  client_ = client;
+  data_ = data;
+  len_ = len;
+}
+
+void BufferPool::BufferHandle::Reset() {
+  client_ = NULL;
+  data_ = NULL;
+  len_ = -1;
+}
+
+BufferPool::PageHandle::PageHandle() {
+  Reset();
+}
+
+BufferPool::PageHandle::PageHandle(PageHandle&& src) {
+  *this = std::move(src);
+}
+
+BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) {
+  DCHECK(!is_open());
+  // Copy over all members then close src.
+  page_ = src.page_;
+  client_ = src.client_;
+  src.Reset();
+  return *this;
+}
+
+void BufferPool::PageHandle::Open(Page* page, Client* client) {
+  DCHECK(!is_open());
+  page->lock.DCheckLocked();
+  page_ = page;
+  client_ = client;
+}
+
+void BufferPool::PageHandle::Reset() {
+  page_ = NULL;
+  client_ = NULL;
+}
+
+int BufferPool::PageHandle::pin_count() const {
+  DCHECK(is_open());
+  // The pin count can only be modified via this PageHandle, which must not be
+  // concurrently accessed by multiple threads, so it is safe to access 
without locking
+  return page_->pin_count;
+}
+
+int64_t BufferPool::PageHandle::len() const {
+  DCHECK(is_open());
+  // The length of the page cannot change, so it is safe to access without 
locking.
+  return page_->len;
+}
+
+const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
+  DCHECK(is_pinned());
+  // The 'buffer' field cannot change while the page is pinned, so it is safe 
to access
+  // without locking.
+  return &page_->buffer;
+}
+
+BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
+  : allocator_(new BufferAllocator(min_buffer_len)),
+    min_buffer_len_(min_buffer_len),
+    buffer_bytes_limit_(buffer_bytes_limit),
+    buffer_bytes_remaining_(buffer_bytes_limit) {
+  DCHECK_GT(min_buffer_len, 0);
+  DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
+}
+
+BufferPool::~BufferPool() {
+  DCHECK(pages_.empty());
+}
+
+Status BufferPool::RegisterClient(
+    const string& name, ReservationTracker* reservation, RuntimeProfile* 
profile,
+    Client* client) {
+  DCHECK(!client->is_registered());
+  DCHECK(reservation != NULL);
+  client->InitCounters(profile);
+  client->reservation_ = reservation;
+  client->name_ = name;
+  return Status::OK();
+}
+
+void BufferPool::Client::InitCounters(RuntimeProfile* profile) {
+  counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
+  counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadWaitTime");
+  counters_.write_wait_time = ADD_TIMER(profile, "BufferPoolWriteWaitTime");
+  counters_.peak_unpinned_bytes =
+      profile->AddHighWaterMarkCounter("BufferPoolPeakUnpinnedBytes", 
TUnit::BYTES);
+  counters_.total_unpinned_bytes =
+      ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES);
+}
+
+void BufferPool::DeregisterClient(Client* client) {
+  if (!client->is_registered()) return;
+  client->reservation_->Close();
+  client->name_.clear();
+  client->reservation_ = NULL;
+}
+
+Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) 
{
+  DCHECK(!handle->is_open());
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+  BufferHandle buffer;
+  // No changes have been made to state yet, so we can cleanly return on error.
+  RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer));
+
+  Page* page = new Page(len);
+  {
+    lock_guard<SpinLock> pl(page->lock);
+    page->buffer = std::move(buffer);
+    handle->Open(page, client);
+    page->IncrementPinCount(handle);
+  }
+
+  // Only add to globally-visible list after page is initialized. The page 
lock also
+  // needs to be released before enqueueing to respect the lock ordering.
+  pages_.Enqueue(page);
+
+  client->reservation_->AllocateFrom(len);
+  return Status::OK();
+}
+
+void BufferPool::DestroyPage(Client* client, PageHandle* handle) {
+  if (!handle->is_open()) return; // DestroyPage() should be idempotent.
+
+  Page* page = handle->page_;
+  if (handle->is_pinned()) {
+    // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do 
the work
+    // of cleaning up the page and freeing the buffer.
+    BufferHandle buffer;
+    ExtractBuffer(client, handle, &buffer);
+    FreeBuffer(client, &buffer);
+    return;
+  }
+
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its 
state.
+    // In the unpinned case, no reservation is consumed, so just free the 
buffer.
+    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+    if (page->buffer.is_open()) FreeBufferInternal(&page->buffer);
+  }
+  CleanUpPage(handle);
+}
+
+void BufferPool::CleanUpPage(PageHandle* handle) {
+  // Remove the destroyed page from data structures in a way that ensures no 
other
+  // threads have a remaining reference. Threads that access pages via the 
'pages_'
+  // list hold 'pages_.lock_', so Remove() will not return until those threads 
are done
+  // and it is safe to delete page.
+  pages_.Remove(handle->page_);
+  delete handle->page_;
+  handle->Reset();
+}
+
+Status BufferPool::Pin(Client* client, PageHandle* handle) {
+  DCHECK(client->is_registered());
+  DCHECK(handle->is_open());
+  DCHECK_EQ(handle->client_, client);
+
+  Page* page = handle->page_;
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its 
state.
+    if (page->pin_count == 0)  {
+      if (!page->buffer.is_open()) {
+        // No changes have been made to state yet, so we can cleanly return on 
error.
+        RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, 
&page->buffer));
+
+        // TODO: will need to initiate/wait for read if the page is not 
in-memory.
+      }
+      COUNTER_ADD(client->counters_.peak_unpinned_bytes, -handle->len());
+    }
+    page->IncrementPinCount(handle);
+  }
+
+  client->reservation_->AllocateFrom(page->len);
+  return Status::OK();
+}
+
+void BufferPool::Unpin(Client* client, PageHandle* handle) {
+  DCHECK(handle->is_open());
+  lock_guard<SpinLock> pl(handle->page_->lock);
+  UnpinLocked(client, handle);
+}
+
+void BufferPool::UnpinLocked(Client* client, PageHandle* handle) {
+  DCHECK(client->is_registered());
+  DCHECK_EQ(handle->client_, client);
+  // If handle is pinned, we can assume that the page itself is pinned.
+  DCHECK(handle->is_pinned());
+  Page* page = handle->page_;
+  page->lock.DCheckLocked();
+
+  page->DecrementPinCount(handle);
+  client->reservation_->ReleaseTo(page->len);
+
+  COUNTER_ADD(client->counters_.total_unpinned_bytes, handle->len());
+  COUNTER_ADD(client->counters_.peak_unpinned_bytes, handle->len());
+
+  // TODO: can evict now. Only need to preserve contents if 'page->dirty' is 
true.
+}
+
+void BufferPool::ExtractBuffer(
+    Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
+  DCHECK(page_handle->is_pinned());
+
+  DCHECK_EQ(page_handle->client_, client);
+
+  Page* page = page_handle->page_;
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its 
state.
+    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+
+    // Bring the pin count to 1 so that we're not using surplus reservations.
+    while (page->pin_count > 1) UnpinLocked(client, page_handle);
+    *buffer_handle = std::move(page->buffer);
+  }
+  CleanUpPage(page_handle);
+}
+
+Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* 
handle) {
+  client->reservation_->AllocateFrom(len);
+  return AllocateBufferInternal(client, len, handle);
+}
+
+Status BufferPool::AllocateBufferInternal(
+    Client* client, int64_t len, BufferHandle* buffer) {
+  DCHECK(!buffer->is_open());
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+  SCOPED_TIMER(client->counters_.get_buffer_time);
+
+  // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a 
new buffer.
+  if (TryDecreaseBufferBytesRemaining(len)) {
+    uint8_t* data;
+    Status status = allocator_->Allocate(len, &data);
+    if (!status.ok()) {
+      buffer_bytes_remaining_.Add(len);
+      return status;
+    }
+    DCHECK(data != NULL);
+    buffer->Open(client, data, len);
+    return Status::OK();
+  }
+
+  // If there is no remaining capacity, we must evict another page.
+  return Status(TErrorCode::NOT_IMPLEMENTED_ERROR,
+      Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page 
eviction is "
+                 "not implemented yet!", buffer_bytes_limit_));
+}
+
+void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) {
+  if (!handle->is_open()) return; // Should be idempotent.
+  DCHECK_EQ(client, handle->client_);
+  client->reservation_->ReleaseTo(handle->len_);
+  FreeBufferInternal(handle);
+}
+
+void BufferPool::FreeBufferInternal(BufferHandle* handle) {
+  DCHECK(handle->is_open());
+  allocator_->Free(handle->data(), handle->len());
+  buffer_bytes_remaining_.Add(handle->len());
+  handle->Reset();
+}
+
+Status BufferPool::TransferBuffer(
+    Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* 
dst) {
+  DCHECK(src->is_open());
+  DCHECK(!dst->is_open());
+  DCHECK_EQ(src_client, src->client_);
+  DCHECK_NE(src, dst);
+  DCHECK_NE(src_client, dst_client);
+
+  dst_client->reservation_->AllocateFrom(src->len());
+  src_client->reservation_->ReleaseTo(src->len());
+  *dst = std::move(*src);
+  dst->client_ = dst_client;
+  return Status::OK();
+}
+
+bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) {
+  // TODO: we may want to change this policy so that we don't always use up to 
the limit
+  // for buffers, since this may starve other operators using non-buffer-pool 
memory.
+  while (true) {
+    int64_t old_value = buffer_bytes_remaining_.Load();
+    if (old_value < len) return false;
+    int64_t new_value = old_value - len;
+    if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
+      return true;
+    }
+  }
+}
+
+string BufferPool::Client::DebugString() const {
+  if (is_registered()) {
+    return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", 
this, name_,
+        reservation_->DebugString());
+  } else {
+    return Substitute("<BufferPool::Client> $0 UNREGISTERED", this);
+  }
+}
+
+string BufferPool::PageHandle::DebugString() const {
+  if (is_open()) {
+    lock_guard<SpinLock> pl(page_->lock);
+    return Substitute(
+        "<BufferPool::PageHandle> $0 client: {$1} page: {$2}",
+        this, client_->DebugString(), page_->DebugString());
+  } else {
+    return Substitute("<BufferPool::PageHandle> $0 CLOSED", this);
+  }
+}
+
+string BufferPool::BufferHandle::DebugString() const {
+  if (is_open()) {
+    return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 
len: $3", this,
+        client_->DebugString(), data_, len_);
+  } else {
+    return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this);
+  }
+}
+
+string BufferPool::DebugString() {
+  stringstream ss;
+  ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
+     << " buffer_bytes_limit: " << buffer_bytes_limit_
+     << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n";
+  pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+  return ss.str();
+}
+}

Reply via email to