http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/reservation-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker.cc b/be/src/bufferpool/reservation-tracker.cc deleted file mode 100644 index c5ed086..0000000 --- a/be/src/bufferpool/reservation-tracker.cc +++ /dev/null @@ -1,306 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "bufferpool/reservation-tracker.h" - -#include <algorithm> - -#include "common/object-pool.h" -#include "gutil/strings/substitute.h" -#include "runtime/mem-tracker.h" -#include "util/dummy-runtime-profile.h" -#include "util/runtime-profile-counters.h" - -#include "common/names.h" - -namespace impala { - -ReservationTracker::ReservationTracker() : initialized_(false), mem_tracker_(NULL) {} - -ReservationTracker::~ReservationTracker() { - DCHECK(!initialized_); -} - -void ReservationTracker::InitRootTracker( - RuntimeProfile* profile, int64_t reservation_limit) { - lock_guard<SpinLock> l(lock_); - DCHECK(!initialized_); - parent_ = NULL; - mem_tracker_ = NULL; - reservation_limit_ = reservation_limit; - reservation_ = 0; - used_reservation_ = 0; - child_reservations_ = 0; - initialized_ = true; - - InitCounters(profile, reservation_limit_); - COUNTER_SET(counters_.peak_reservation, reservation_); - - CheckConsistency(); -} - -void ReservationTracker::InitChildTracker(RuntimeProfile* profile, - ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) { - DCHECK(parent != NULL); - DCHECK_GE(reservation_limit, 0); - - lock_guard<SpinLock> l(lock_); - DCHECK(!initialized_); - parent_ = parent; - mem_tracker_ = mem_tracker; - - reservation_limit_ = reservation_limit; - reservation_ = 0; - used_reservation_ = 0; - child_reservations_ = 0; - initialized_ = true; - - if (mem_tracker_ != NULL) { - MemTracker* parent_mem_tracker = GetParentMemTracker(); - if (parent_mem_tracker != NULL) { - // Make sure the parent links of the MemTrackers correspond to our parent links. - DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent()); - // Make sure we don't have a lower limit than the ancestor, since we don't enforce - // limits at lower links. - DCHECK_EQ(mem_tracker_->lowest_limit(), parent_mem_tracker->lowest_limit()); - } else { - // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent - // shouldn't have a MemTracker. - ReservationTracker* ancestor = parent_; - while (ancestor != NULL) { - DCHECK(ancestor->mem_tracker_ == NULL); - ancestor = ancestor->parent_; - } - } - } - - InitCounters(profile, reservation_limit_); - - CheckConsistency(); -} - -void ReservationTracker::InitCounters( - RuntimeProfile* profile, int64_t reservation_limit) { - bool profile_provided = profile != NULL; - if (profile == NULL) { - dummy_profile_.reset(new DummyProfile); - profile = dummy_profile_->profile(); - } - - // Check that another tracker's counters aren't already registered in the profile. - DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL); - counters_.reservation_limit = - ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES); - counters_.peak_reservation = - profile->AddHighWaterMarkCounter("BufferPoolPeakReservation", TUnit::BYTES); - counters_.peak_used_reservation = - profile->AddHighWaterMarkCounter("BufferPoolPeakUsedReservation", TUnit::BYTES); - - COUNTER_SET(counters_.reservation_limit, reservation_limit); - - if (mem_tracker_ != NULL && profile_provided) { - mem_tracker_->EnableReservationReporting(counters_); - } -} - -void ReservationTracker::Close() { - lock_guard<SpinLock> l(lock_); - if (!initialized_) return; - CheckConsistency(); - DCHECK_EQ(used_reservation_, 0); - DCHECK_EQ(child_reservations_, 0); - // Release any reservation to parent. - if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false); - mem_tracker_ = NULL; - parent_ = NULL; - initialized_ = false; -} - -bool ReservationTracker::IncreaseReservation(int64_t bytes) { - lock_guard<SpinLock> l(lock_); - return IncreaseReservationInternalLocked(bytes, false, false); -} - -bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) { - lock_guard<SpinLock> l(lock_); - return IncreaseReservationInternalLocked(bytes, true, false); -} - -bool ReservationTracker::IncreaseReservationInternalLocked( - int64_t bytes, bool use_existing_reservation, bool is_child_reservation) { - DCHECK(initialized_); - int64_t reservation_increase = - use_existing_reservation ? max<int64_t>(0, bytes - unused_reservation()) : bytes; - DCHECK_GE(reservation_increase, 0); - - bool granted; - // Check if the increase is allowed, starting at the bottom of hierarchy. - if (reservation_ + reservation_increase > reservation_limit_) { - granted = false; - } else if (reservation_increase == 0) { - granted = true; - } else { - if (parent_ == NULL) { - granted = true; - } else { - lock_guard<SpinLock> l(parent_->lock_); - granted = - parent_->IncreaseReservationInternalLocked(reservation_increase, true, true); - } - if (granted && !TryUpdateMemTracker(reservation_increase)) { - granted = false; - // Roll back changes to ancestors if MemTracker update fails. - parent_->DecreaseReservationInternal(reservation_increase, true); - } - } - - if (granted) { - // The reservation was granted and state updated in all ancestors: we can modify - // this tracker's state now. - UpdateReservation(reservation_increase); - if (is_child_reservation) child_reservations_ += bytes; - } - - CheckConsistency(); - return granted; -} - -bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) { - if (mem_tracker_ == NULL) return true; - if (GetParentMemTracker() == NULL) { - // At the topmost link, which may be a MemTracker with a limit, we need to use - // TryConsume() to check the limit. - return mem_tracker_->TryConsume(reservation_increase); - } else { - // For lower links, there shouldn't be a limit to enforce, so we just need to - // update the consumption of the linked MemTracker since the reservation is - // already reflected in its parent. - mem_tracker_->ConsumeLocal(reservation_increase, GetParentMemTracker()); - return true; - } -} - -void ReservationTracker::DecreaseReservation(int64_t bytes) { - DecreaseReservationInternal(bytes, false); -} - -void ReservationTracker::DecreaseReservationInternal( - int64_t bytes, bool is_child_reservation) { - lock_guard<SpinLock> l(lock_); - DecreaseReservationInternalLocked(bytes, is_child_reservation); -} - -void ReservationTracker::DecreaseReservationInternalLocked( - int64_t bytes, bool is_child_reservation) { - DCHECK(initialized_); - DCHECK_GE(reservation_, bytes); - if (bytes == 0) return; - if (is_child_reservation) child_reservations_ -= bytes; - UpdateReservation(-bytes); - // The reservation should be returned up the tree. - if (mem_tracker_ != NULL) { - if (GetParentMemTracker() == NULL) { - mem_tracker_->Release(bytes); - } else { - mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker()); - } - } - if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true); - CheckConsistency(); -} - -void ReservationTracker::AllocateFrom(int64_t bytes) { - lock_guard<SpinLock> l(lock_); - DCHECK(initialized_); - DCHECK_GE(bytes, 0); - DCHECK_LE(bytes, unused_reservation()); - UpdateUsedReservation(bytes); - CheckConsistency(); -} - -void ReservationTracker::ReleaseTo(int64_t bytes) { - lock_guard<SpinLock> l(lock_); - DCHECK(initialized_); - DCHECK_GE(bytes, 0); - DCHECK_LE(bytes, used_reservation_); - UpdateUsedReservation(-bytes); - CheckConsistency(); -} - -int64_t ReservationTracker::GetReservation() { - lock_guard<SpinLock> l(lock_); - DCHECK(initialized_); - return reservation_; -} - -int64_t ReservationTracker::GetUsedReservation() { - lock_guard<SpinLock> l(lock_); - DCHECK(initialized_); - return used_reservation_; -} - -int64_t ReservationTracker::GetUnusedReservation() { - lock_guard<SpinLock> l(lock_); - DCHECK(initialized_); - return unused_reservation(); -} - -int64_t ReservationTracker::GetChildReservations() { - lock_guard<SpinLock> l(lock_); - DCHECK(initialized_); - return child_reservations_; -} - -void ReservationTracker::CheckConsistency() const { - // Check internal invariants. - DCHECK_GE(reservation_, 0); - DCHECK_LE(reservation_, reservation_limit_); - DCHECK_GE(child_reservations_, 0); - DCHECK_GE(used_reservation_, 0); - DCHECK_LE(used_reservation_ + child_reservations_, reservation_); - - DCHECK_EQ(reservation_, counters_.peak_reservation->current_value()); - DCHECK_LE(reservation_, counters_.peak_reservation->value()); - DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value()); - DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value()); - DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value()); -} - -void ReservationTracker::UpdateUsedReservation(int64_t delta) { - used_reservation_ += delta; - COUNTER_SET(counters_.peak_used_reservation, used_reservation_); - CheckConsistency(); -} - -void ReservationTracker::UpdateReservation(int64_t delta) { - reservation_ += delta; - COUNTER_SET(counters_.peak_reservation, reservation_); - CheckConsistency(); -} - -string ReservationTracker::DebugString() { - lock_guard<SpinLock> l(lock_); - if (!initialized_) return "<ReservationTracker>: uninitialized"; - - string parent_debug_string = parent_ == NULL ? "NULL" : parent_->DebugString(); - return Substitute( - "<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 " - "child_reservations $3 parent:\n$4", - reservation_limit_, reservation_, used_reservation_, child_reservations_, - parent_debug_string); -} -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/reservation-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker.h b/be/src/bufferpool/reservation-tracker.h deleted file mode 100644 index 6bdecf0..0000000 --- a/be/src/bufferpool/reservation-tracker.h +++ /dev/null @@ -1,248 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RESERVATION_TRACKER_H -#define IMPALA_RESERVATION_TRACKER_H - -#include <stdint.h> -#include <boost/scoped_ptr.hpp> -#include <boost/thread/locks.hpp> -#include <string> - -#include "bufferpool/reservation-tracker-counters.h" -#include "common/status.h" -#include "util/spinlock.h" - -namespace impala { - -class DummyProfile; -class MemTracker; -class RuntimeProfile; - -/// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes. -/// A hierarchy of ReservationTrackers provides a mechanism for subdividing buffer pool -/// memory and enforcing upper and lower bounds on memory usage. -/// -/// The root of the tracker tree enforces a global maximum, which is distributed among its -/// children. Each tracker in the tree has a 'reservation': the total bytes of buffer pool -/// memory it is entitled to use. The reservation is inclusive of any memory that is -/// already allocated from the reservation, i.e. using a reservation to allocate memory -/// does not subtract from the reservation. -/// -/// A reservation can be used directly at the tracker by calling AllocateFrom(), or -/// distributed to children of the tracker for the childrens' reservations. Each tracker -/// in the tree can use up to its reservation without checking parent trackers. To -/// increase its reservation, a tracker must use some of its parent's reservation (and -/// perhaps increase reservations all the way to the root of the tree). -/// -/// Each tracker also has a maximum reservation that is enforced. E.g. if the root of the -/// tracker hierarchy is the global tracker for the Impala daemon and the next level of -/// the hierarchy is made up of per-query trackers, then the maximum reservation -/// mechanism can enforce both process-level and query-level limits on reservations. -/// -/// Invariants: -/// * A tracker's reservation is at most its reservation limit: reservation <= limit -/// * A tracker's reservation is at least the sum of its childrens' reservations plus -/// the amount of the reservation used directly at this tracker. The difference is -/// the unused reservation: -/// child_reservations + used_reservation + unused_reservation = reservation. -/// -/// Thread-safety: -/// All public ReservationTracker methods are thread-safe. If multiple threads -/// concurrently invoke methods on a ReservationTracker, each operation is applied -/// atomically to leave the ReservationTracker in a consistent state. Calling threads -/// are responsible for coordinating to avoid violating any method preconditions, -/// e.g. ensuring that there is sufficient unused reservation before calling AllocateTo(). -/// -/// Integration with MemTracker hierarchy: -/// TODO: we will remove MemTracker and this integration once all memory is accounted via -/// reservations. -/// -/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an exec -/// node's ReservationTracker can be linked with the exec node's MemTracker, so that -/// reservations are included in query memory consumption for the purposes of enforcing -/// memory limits, reporting and logging. The reservation is accounted as consumption -/// against the linked MemTracker and its ancestors because reserved memory is committed. -/// Allocating from a reservation therefore does not change the consumption reflected in -/// the MemTracker hierarchy. -/// -/// MemTracker limits are only checked via the topmost link (i.e. the query-level -/// trackers): we require that no MemTrackers below this level have limits. -/// -/// We require that the MemTracker hierarchy is consistent with the ReservationTracker -/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and its parent -/// is linked to a MemTracker "B", then "B" must be the parent of "A"'. -class ReservationTracker { - public: - ReservationTracker(); - virtual ~ReservationTracker(); - - /// Initializes the root tracker with the given reservation limit in bytes. The initial - /// reservation is 0. - /// if 'profile' is not NULL, the counters defined in ReservationTrackerCounters are - /// added to 'profile'. - void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit); - - /// Initializes a new ReservationTracker with a parent. - /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker and its - /// children will be counted as consumption against 'mem_tracker'. - /// 'reservation_limit' is the maximum reservation for this tracker in bytes. - /// if 'profile' is not NULL, the counters in 'counters_' are added to 'profile'. - void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent, - MemTracker* mem_tracker, int64_t reservation_limit); - - /// If the tracker is initialized, deregister the ReservationTracker from its parent, - /// relinquishing all this tracker's reservation. All of the reservation must be unused - /// and all the tracker's children must be closed before calling this method. - void Close(); - - /// Request to increase reservation by 'bytes'. The request is either granted in - /// full or not at all. Uses any unused reservation on ancestors and increase - /// ancestors' reservations if needed to fit the increased reservation. - /// Returns true if the reservation increase is granted, or false if not granted. - /// If the reservation is not granted, no modifications are made to the state of - /// any ReservationTrackers. - bool IncreaseReservation(int64_t bytes); - - /// Tries to ensure that 'bytes' of unused reservation is available. If not already - /// available, tries to increase the reservation such that the unused reservation is - /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and increase - /// ancestors' reservations if needed to fit the increased reservation. - /// Returns true if the reservation increase was successful or not necessary. - bool IncreaseReservationToFit(int64_t bytes); - - /// Decrease tracker's reservation by 'bytes'. This tracker's reservation must be at - /// least 'bytes' before calling this method. - /// TODO: decide on and implement policy for how far to release the reservation up - /// the tree. Currently the reservation is released all the way to the root. - void DecreaseReservation(int64_t bytes); - - /// Allocate 'bytes' from the reservation. The tracker must have at least 'bytes' - /// unused reservation before calling this method. - void AllocateFrom(int64_t bytes); - - /// Release 'bytes' of previously allocated memory. The used reservation is - /// decreased by 'bytes'. Before the call, the used reservation must be at least - /// 'bytes' before calling this method. - void ReleaseTo(int64_t bytes); - - /// Returns the amount of the reservation in bytes. - int64_t GetReservation(); - - /// Returns the current amount of the reservation used at this tracker, not including - /// reservations of children in bytes. - int64_t GetUsedReservation(); - - /// Returns the amount of the reservation neither used nor given to childrens' - /// reservations at this tracker in bytes. - int64_t GetUnusedReservation(); - - /// Returns the total reservations of children in bytes. - int64_t GetChildReservations(); - - std::string DebugString(); - - private: - /// Returns the amount of 'reservation_' that is unused. - inline int64_t unused_reservation() const { - return reservation_ - used_reservation_ - child_reservations_; - } - - /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL otherwise. - MemTracker* GetParentMemTracker() const { - return parent_ == NULL ? NULL : parent_->mem_tracker_; - } - - /// Initializes 'counters_', storing the counters in 'profile'. - /// If 'profile' is NULL, creates a dummy profile to store the counters. - void InitCounters(RuntimeProfile* profile, int64_t max_reservation); - - /// Internal helper for IncreaseReservation(). If 'use_existing_reservation' is true, - /// increase by the minimum amount so that 'bytes' fits in the reservation, otherwise - /// just increase by 'bytes'. If 'is_child_reservation' is true, also increase - /// 'child_reservations_' by 'bytes'. - /// 'lock_' must be held by caller. - bool IncreaseReservationInternalLocked( - int64_t bytes, bool use_existing_reservation, bool is_child_reservation); - - /// Update consumption on linked MemTracker. For the topmost link, return false if - /// this failed because it would exceed a memory limit. If there is no linked - /// MemTracker, just returns true. - /// TODO: remove once we account all memory via ReservationTrackers. - bool TryUpdateMemTracker(int64_t reservation_increase); - - /// Internal helper for DecreaseReservation(). This behaves the same as - /// DecreaseReservation(), except when 'is_child_reservation' is true it decreases - /// 'child_reservations_' by 'bytes'. - void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation); - - /// Same as DecreaseReservationInternal(), but 'lock_' must be held by caller. - void DecreaseReservationInternalLocked(int64_t bytes, bool is_child_reservation); - - /// Check the internal consistency of the ReservationTracker and DCHECKs if in an - /// inconsistent state. - /// 'lock_' must be held by caller. - void CheckConsistency() const; - - /// Increase or decrease 'used_reservation_' and update profile counters accordingly. - /// 'lock_' must be held by caller. - void UpdateUsedReservation(int64_t delta); - - /// Increase or decrease 'reservation_' and update profile counters accordingly. - /// 'lock_' must be held by caller. - void UpdateReservation(int64_t delta); - - /// lock_ protects all members. In a hierarchy of trackers, locks must be acquired - /// from the bottom-up. - SpinLock lock_; - - /// True if the tracker is initialized. - bool initialized_; - - /// A dummy profile to hold the counters in 'counters_' in the case that no profile - /// is provided. - boost::scoped_ptr<DummyProfile> dummy_profile_; - - /// The RuntimeProfile counters for this tracker. - /// All non-NULL if 'initialized_' is true. - ReservationTrackerCounters counters_; - - /// The parent of this tracker in the hierarchy. Does not change after initialization. - ReservationTracker* parent_; - - /// If non-NULL, reservations are counted as memory consumption against this tracker. - /// Does not change after initialization. Not owned. - /// TODO: remove once all memory is accounted via ReservationTrackers. - MemTracker* mem_tracker_; - - /// The maximum reservation in bytes that this tracker can have. - int64_t reservation_limit_; - - /// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'. - int64_t reservation_; - - /// Total reservation of children in bytes. This is included in 'reservation_'. - /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. - int64_t child_reservations_; - - /// The amount of the reservation currently used by this tracker in bytes. - /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. - int64_t used_reservation_; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 19db090..54a1347 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +add_subdirectory(bufferpool) # where to put generated libraries set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime") http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/CMakeLists.txt b/be/src/runtime/bufferpool/CMakeLists.txt new file mode 100644 index 0000000..758d538 --- /dev/null +++ b/be/src/runtime/bufferpool/CMakeLists.txt @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/bufferpool") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/bufferpool") + +add_library(BufferPool + buffer-allocator.cc + buffer-pool.cc + reservation-tracker.cc +) +add_dependencies(BufferPool thrift-deps) + +ADD_BE_TEST(buffer-pool-test) +ADD_BE_TEST(reservation-tracker-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-allocator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-allocator.cc b/be/src/runtime/bufferpool/buffer-allocator.cc new file mode 100644 index 0000000..3befe76 --- /dev/null +++ b/be/src/runtime/bufferpool/buffer-allocator.cc @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/bufferpool/buffer-allocator.h" + +#include "util/bit-util.h" + +namespace impala { + +BufferAllocator::BufferAllocator(int64_t min_buffer_len) + : min_buffer_len_(min_buffer_len) {} + +Status BufferAllocator::Allocate(int64_t len, uint8_t** buffer) { + DCHECK_GE(len, min_buffer_len_); + DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); + + *buffer = reinterpret_cast<uint8_t*>(malloc(len)); + if (*buffer == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len); + return Status::OK(); +} + +void BufferAllocator::Free(uint8_t* buffer, int64_t len) { + free(buffer); +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-allocator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-allocator.h b/be/src/runtime/bufferpool/buffer-allocator.h new file mode 100644 index 0000000..54b667a --- /dev/null +++ b/be/src/runtime/bufferpool/buffer-allocator.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BUFFER_ALLOCATOR_H +#define IMPALA_RUNTIME_BUFFER_ALLOCATOR_H + +#include "common/status.h" + +namespace impala { + +/// The underlying memory allocator for the buffer pool. All buffers are allocated through +/// the BufferPool's BufferAllocator. The allocator only handles allocating buffers that +/// are power-of-two multiples of the minimum buffer length. +/// +/// TODO: +/// * Allocate memory with mmap() instead of malloc(). +/// * Implement free lists in the allocator or external to the allocator. +class BufferAllocator { + public: + BufferAllocator(int64_t min_buffer_len); + + /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple + /// of the minimum buffer length. + Status Allocate(int64_t len, uint8_t** buffer); + + /// Free the memory for a previously-allocated buffer. + void Free(uint8_t* buffer, int64_t len); + + private: + const int64_t min_buffer_len_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-pool-counters.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-counters.h b/be/src/runtime/bufferpool/buffer-pool-counters.h new file mode 100644 index 0000000..e4b1c36 --- /dev/null +++ b/be/src/runtime/bufferpool/buffer-pool-counters.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BUFFER_POOL_COUNTERS_H +#define IMPALA_RUNTIME_BUFFER_POOL_COUNTERS_H + +#include "util/runtime-profile.h" + +namespace impala { + +/// A set of counters for each buffer pool client. +struct BufferPoolClientCounters { + public: + /// Amount of time spent trying to get a buffer. + RuntimeProfile::Counter* get_buffer_time; + + /// Amount of time spent waiting for reads from disk to complete. + RuntimeProfile::Counter* read_wait_time; + + /// Amount of time spent waiting for writes to disk to complete. + RuntimeProfile::Counter* write_wait_time; + + /// The peak total size of unpinned buffers. + RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes; + + /// The total bytes of data unpinned. Every time a page's pin count goes from 1 to 0, + /// this counter is incremented by the page size. + RuntimeProfile::Counter* total_unpinned_bytes; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc new file mode 100644 index 0000000..af84bbe --- /dev/null +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -0,0 +1,554 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/bind.hpp> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/thread.hpp> +#include <boost/unordered_map.hpp> +#include <cstdlib> +#include <string> +#include <vector> + +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/reservation-tracker.h" +#include "common/init.h" +#include "common/object-pool.h" +#include "testutil/death-test-util.h" +#include "testutil/gtest-util.h" + +#include "common/names.h" + +namespace impala { + +class BufferPoolTest : public ::testing::Test { + public: + virtual void SetUp() {} + + virtual void TearDown() { + for (auto entry : query_reservations_) { + ReservationTracker* tracker = entry.second; + tracker->Close(); + } + + global_reservations_.Close(); + obj_pool_.Clear(); + } + + /// The minimum buffer size used in most tests. + const static int64_t TEST_BUFFER_LEN = 1024; + + /// Test helper to simulate registering then deregistering a number of queries with + /// the given initial reservation and reservation limit. + void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries, + int64_t initial_query_reservation, int64_t query_reservation_limit); + + /// Create and destroy a page multiple times. + void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, int num_ops); + + protected: + static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; } + + /// Helper function to create one reservation tracker per query. + ReservationTracker* GetQueryReservationTracker(int64_t query_id) { + lock_guard<SpinLock> l(query_reservations_lock_); + ReservationTracker* tracker = query_reservations_[query_id]; + if (tracker != NULL) return tracker; + tracker = obj_pool_.Add(new ReservationTracker()); + query_reservations_[query_id] = tracker; + return tracker; + } + + RuntimeProfile* NewProfile() { + return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile")); + } + + ObjectPool obj_pool_; + + ReservationTracker global_reservations_; + + // Map from query_id to the reservation tracker for that query. Reads and modifications + // of the map are protected by query_reservations_lock_. + unordered_map<int64_t, ReservationTracker*> query_reservations_; + SpinLock query_reservations_lock_; +}; + +const int64_t BufferPoolTest::TEST_BUFFER_LEN; + +void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, + int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) { + Status status; + + int clients_per_query = 32; + BufferPool::Client* clients[num_queries]; + ReservationTracker* client_reservations[num_queries]; + + for (int i = 0; i < num_queries; ++i) { + int64_t query_id = QueryId(query_id_hi, i); + + // Initialize a tracker for a new query. + ReservationTracker* query_reservation = GetQueryReservationTracker(query_id); + query_reservation->InitChildTracker( + NULL, &global_reservations_, NULL, query_reservation_limit); + + // Test that closing then reopening child tracker works. + query_reservation->Close(); + query_reservation->InitChildTracker( + NULL, &global_reservations_, NULL, query_reservation_limit); + EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation)); + + clients[i] = new BufferPool::Client[clients_per_query]; + client_reservations[i] = new ReservationTracker[clients_per_query]; + + for (int j = 0; j < clients_per_query; ++j) { + int64_t initial_client_reservation = + initial_query_reservation / clients_per_query + j + < initial_query_reservation % clients_per_query; + // Reservation limit can be anything greater or equal to the initial reservation. + int64_t client_reservation_limit = initial_client_reservation + rand() % 100000; + client_reservations[i][j].InitChildTracker( + NULL, query_reservation, NULL, client_reservation_limit); + EXPECT_TRUE( + client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation)); + string name = Substitute("Client $0 for query $1", j, query_id); + EXPECT_OK(pool->RegisterClient( + name, &client_reservations[i][j], NewProfile(), &clients[i][j])); + } + + for (int j = 0; j < clients_per_query; ++j) { + ASSERT_TRUE(clients[i][j].is_registered()); + } + } + + // Deregister clients then query. + for (int i = 0; i < num_queries; ++i) { + for (int j = 0; j < clients_per_query; ++j) { + pool->DeregisterClient(&clients[i][j]); + ASSERT_FALSE(clients[i][j].is_registered()); + client_reservations[i][j].Close(); + } + + delete[] clients[i]; + delete[] client_reservations[i]; + + GetQueryReservationTracker(QueryId(query_id_hi, i))->Close(); + } +} + +/// Test that queries and clients can be registered and deregistered with the reservation +/// trackers and the buffer pool. +TEST_F(BufferPoolTest, BasicRegistration) { + int num_concurrent_queries = 1024; + int64_t sum_initial_reservations = 4; + int64_t reservation_limit = 1024; + // Need enough buffers for all initial reservations. + int64_t total_mem = sum_initial_reservations * num_concurrent_queries; + global_reservations_.InitRootTracker(NewProfile(), total_mem); + + BufferPool pool(TEST_BUFFER_LEN, total_mem); + + RegisterQueriesAndClients( + &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit); + + DCHECK_EQ(global_reservations_.GetUsedReservation(), 0); + DCHECK_EQ(global_reservations_.GetChildReservations(), 0); + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +/// Test that queries and clients can be registered and deregistered by concurrent +/// threads. +TEST_F(BufferPoolTest, ConcurrentRegistration) { + int queries_per_thread = 64; + int num_threads = 64; + int num_concurrent_queries = queries_per_thread * num_threads; + int64_t sum_initial_reservations = 4; + int64_t reservation_limit = 1024; + // Need enough buffers for all initial reservations. + int64_t total_mem = num_concurrent_queries * sum_initial_reservations; + global_reservations_.InitRootTracker(NewProfile(), total_mem); + + BufferPool pool(TEST_BUFFER_LEN, total_mem); + + // Launch threads, each with a different set of query IDs. + thread_group workers; + for (int i = 0; i < num_threads; ++i) { + workers.add_thread(new thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this, + &pool, i, queries_per_thread, sum_initial_reservations, reservation_limit))); + } + workers.join_all(); + + // All the reservations should be released at this point. + DCHECK_EQ(global_reservations_.GetUsedReservation(), 0); + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +/// Test basic page handle creation. +TEST_F(BufferPoolTest, PageCreation) { + // Allocate many pages, each a power-of-two multiple of the minimum page length. + int num_pages = 16; + int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1); + int64_t total_mem = 2 * 2 * max_page_len; + global_reservations_.InitRootTracker(NULL, total_mem); + BufferPool pool(TEST_BUFFER_LEN, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem); + ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem)); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); + + vector<BufferPool::PageHandle> handles(num_pages); + + // Create pages of various valid sizes. + for (int i = 0; i < num_pages; ++i) { + int size_multiple = 1 << i; + int64_t page_len = TEST_BUFFER_LEN * size_multiple; + int64_t used_before = client_tracker->GetUsedReservation(); + ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i])); + ASSERT_TRUE(handles[i].is_open()); + ASSERT_TRUE(handles[i].is_pinned()); + ASSERT_TRUE(handles[i].buffer_handle() != NULL); + ASSERT_TRUE(handles[i].data() != NULL); + ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data()); + ASSERT_EQ(handles[i].len(), page_len); + ASSERT_EQ(handles[i].buffer_handle()->len(), page_len); + DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len); + } + + // Close the handles and check memory consumption. + for (int i = 0; i < num_pages; ++i) { + int64_t used_before = client_tracker->GetUsedReservation(); + int page_len = handles[i].len(); + pool.DestroyPage(&client, &handles[i]); + DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len); + } + + pool.DeregisterClient(&client); + client_tracker->Close(); + + // All the reservations should be released at this point. + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +TEST_F(BufferPoolTest, BufferAllocation) { + // Allocate many buffers, each a power-of-two multiple of the minimum buffer length. + int num_buffers = 16; + int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1); + int64_t total_mem = 2 * 2 * max_buffer_len; + global_reservations_.InitRootTracker(NULL, total_mem); + BufferPool pool(TEST_BUFFER_LEN, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem); + ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem)); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); + + vector<BufferPool::BufferHandle> handles(num_buffers); + + // Create buffers of various valid sizes. + for (int i = 0; i < num_buffers; ++i) { + int size_multiple = 1 << i; + int64_t buffer_len = TEST_BUFFER_LEN * size_multiple; + int64_t used_before = client_tracker->GetUsedReservation(); + ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i])); + ASSERT_TRUE(handles[i].is_open()); + ASSERT_TRUE(handles[i].data() != NULL); + ASSERT_EQ(handles[i].len(), buffer_len); + DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len); + } + + // Close the handles and check memory consumption. + for (int i = 0; i < num_buffers; ++i) { + int64_t used_before = client_tracker->GetUsedReservation(); + int buffer_len = handles[i].len(); + pool.FreeBuffer(&client, &handles[i]); + DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len); + } + + pool.DeregisterClient(&client); + client_tracker->Close(); + + // All the reservations should be released at this point. + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +/// Test transfer of buffer handles between clients. +TEST_F(BufferPoolTest, BufferTransfer) { + // Each client needs to have enough reservation for a buffer. + const int num_clients = 5; + int64_t total_mem = num_clients * TEST_BUFFER_LEN; + global_reservations_.InitRootTracker(NULL, total_mem); + BufferPool pool(TEST_BUFFER_LEN, total_mem); + ReservationTracker client_trackers[num_clients]; + BufferPool::Client clients[num_clients]; + BufferPool::BufferHandle handles[num_clients]; + for (int i = 0; i < num_clients; ++i) { + client_trackers[i].InitChildTracker( + NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); + ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN)); + ASSERT_OK(pool.RegisterClient( + "test client", &client_trackers[i], NewProfile(), &clients[i])); + } + + // Transfer the page around between the clients repeatedly in a circle. + ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0])); + uint8_t* data = handles[0].data(); + for (int iter = 0; iter < 10; ++iter) { + for (int client = 0; client < num_clients; ++client) { + int next_client = (client + 1) % num_clients; + ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client], + &clients[next_client], &handles[next_client])); + // Check that the transfer left things in a consistent state. + ASSERT_FALSE(handles[client].is_open()); + ASSERT_EQ(0, client_trackers[client].GetUsedReservation()); + ASSERT_TRUE(handles[next_client].is_open()); + ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation()); + // The same underlying buffer should be used. + ASSERT_EQ(data, handles[next_client].data()); + } + } + + pool.FreeBuffer(&clients[0], &handles[0]); + for (int i = 0; i < num_clients; ++i) { + pool.DeregisterClient(&clients[i]); + client_trackers[i].Close(); + } + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +/// Test basic pinning and unpinning. +TEST_F(BufferPoolTest, Pin) { + int64_t total_mem = TEST_BUFFER_LEN * 1024; + // Set up client with enough reservation to pin twice. + int64_t child_reservation = TEST_BUFFER_LEN * 2; + BufferPool pool(TEST_BUFFER_LEN, total_mem); + global_reservations_.InitRootTracker(NULL, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker( + NewProfile(), &global_reservations_, NULL, child_reservation); + ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation)); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); + + BufferPool::PageHandle handle1, handle2; + + // Can pin two minimum sized pages. + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); + ASSERT_TRUE(handle1.is_open()); + ASSERT_TRUE(handle1.is_pinned()); + ASSERT_TRUE(handle1.data() != NULL); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); + ASSERT_TRUE(handle2.is_open()); + ASSERT_TRUE(handle2.is_pinned()); + ASSERT_TRUE(handle2.data() != NULL); + + pool.Unpin(&client, &handle2); + ASSERT_FALSE(handle2.is_pinned()); + + // Can pin minimum-sized page twice. + ASSERT_OK(pool.Pin(&client, &handle1)); + ASSERT_TRUE(handle1.is_pinned()); + // Have to unpin twice. + pool.Unpin(&client, &handle1); + ASSERT_TRUE(handle1.is_pinned()); + pool.Unpin(&client, &handle1); + ASSERT_FALSE(handle1.is_pinned()); + + // Can pin double-sized page only once. + BufferPool::PageHandle double_handle; + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle)); + ASSERT_TRUE(double_handle.is_open()); + ASSERT_TRUE(double_handle.is_pinned()); + ASSERT_TRUE(double_handle.data() != NULL); + + // Destroy the pages - test destroying both pinned and unpinned. + pool.DestroyPage(&client, &handle1); + pool.DestroyPage(&client, &handle2); + pool.DestroyPage(&client, &double_handle); + + pool.DeregisterClient(&client); + client_tracker->Close(); +} + +/// Creating a page or pinning without sufficient reservation should DCHECK. +TEST_F(BufferPoolTest, PinWithoutReservation) { + int64_t total_mem = TEST_BUFFER_LEN * 1024; + BufferPool pool(TEST_BUFFER_LEN, total_mem); + global_reservations_.InitRootTracker(NULL, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker( + NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); + + BufferPool::PageHandle handle; + IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), ""); + + // Should succeed after increasing reservation. + ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN)); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)); + + // But we can't pin again. + IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), ""); + + pool.DestroyPage(&client, &handle); + pool.DeregisterClient(&client); + client_tracker->Close(); +} + +TEST_F(BufferPoolTest, ExtractBuffer) { + int64_t total_mem = TEST_BUFFER_LEN * 1024; + // Set up client with enough reservation for two buffers/pins. + int64_t child_reservation = TEST_BUFFER_LEN * 2; + BufferPool pool(TEST_BUFFER_LEN, total_mem); + global_reservations_.InitRootTracker(NULL, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker( + NewProfile(), &global_reservations_, NULL, child_reservation); + ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation)); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); + + BufferPool::PageHandle page; + BufferPool::BufferHandle buffer; + + // Test basic buffer extraction. + for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) { + ASSERT_OK(pool.CreatePage(&client, len, &page)); + uint8_t* page_data = page.data(); + pool.ExtractBuffer(&client, &page, &buffer); + ASSERT_FALSE(page.is_open()); + ASSERT_TRUE(buffer.is_open()); + ASSERT_EQ(len, buffer.len()); + ASSERT_EQ(page_data, buffer.data()); + ASSERT_EQ(len, client_tracker->GetUsedReservation()); + pool.FreeBuffer(&client, &buffer); + ASSERT_EQ(0, client_tracker->GetUsedReservation()); + } + + // Test that ExtractBuffer() accounts correctly for pin count > 1. + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); + uint8_t* page_data = page.data(); + ASSERT_OK(pool.Pin(&client, &page)); + ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation()); + pool.ExtractBuffer(&client, &page, &buffer); + ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation()); + ASSERT_FALSE(page.is_open()); + ASSERT_TRUE(buffer.is_open()); + ASSERT_EQ(TEST_BUFFER_LEN, buffer.len()); + ASSERT_EQ(page_data, buffer.data()); + pool.FreeBuffer(&client, &buffer); + ASSERT_EQ(0, client_tracker->GetUsedReservation()); + + // Test that ExtractBuffer() DCHECKs for unpinned pages. + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); + pool.Unpin(&client, &page); + IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), ""); + pool.DestroyPage(&client, &page); + + pool.DeregisterClient(&client); + client_tracker->Close(); +} + +// Test concurrent creation and destruction of pages. +TEST_F(BufferPoolTest, ConcurrentPageCreation) { + int ops_per_thread = 1024; + int num_threads = 64; + // Need enough buffers for all initial reservations. + int total_mem = num_threads * TEST_BUFFER_LEN; + global_reservations_.InitRootTracker(NULL, total_mem); + + BufferPool pool(TEST_BUFFER_LEN, total_mem); + + // Launch threads, each with a different set of query IDs. + thread_group workers; + for (int i = 0; i < num_threads; ++i) { + workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool, + &global_reservations_, ops_per_thread))); + } + + // Build debug string to test concurrent iteration over pages_ list. + for (int i = 0; i < 64; ++i) { + LOG(INFO) << pool.DebugString(); + } + workers.join_all(); + + // All the reservations should be released at this point. + DCHECK_EQ(global_reservations_.GetChildReservations(), 0); + global_reservations_.Close(); +} + +void BufferPoolTest::CreatePageLoop( + BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) { + ReservationTracker client_tracker; + client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN); + BufferPool::Client client; + ASSERT_OK(pool->RegisterClient("test client", &client_tracker, NewProfile(), &client)); + for (int i = 0; i < num_ops; ++i) { + BufferPool::PageHandle handle; + ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN)); + ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle)); + pool->Unpin(&client, &handle); + ASSERT_OK(pool->Pin(&client, &handle)); + pool->DestroyPage(&client, &handle); + client_tracker.DecreaseReservation(TEST_BUFFER_LEN); + } + pool->DeregisterClient(&client); + client_tracker.Close(); +} + +/// Test error path where pool is unable to fulfill a reservation because it cannot evict +/// unpinned pages. +TEST_F(BufferPoolTest, CapacityExhausted) { + global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN); + // TODO: once we enable spilling, set up buffer pool so that spilling is disabled. + // Set up pool with one more buffer than reservations (so that we hit the reservation + // limit instead of the buffer limit). + BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2); + + BufferPool::PageHandle handle1, handle2, handle3; + + BufferPool::Client client; + ASSERT_OK( + pool.RegisterClient("test client", &global_reservations_, NewProfile(), &client)); + ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN)); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); + + // Do not have enough reservations because we pinned the page. + IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), ""); + + // Even with reservations, we can only create one more unpinned page because we don't + // support eviction yet. + pool.Unpin(&client, &handle1); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); + pool.Unpin(&client, &handle2); + ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok()); + + // After destroying a page, we should have a free buffer that we can use. + pool.DestroyPage(&client, &handle1); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3)); + + pool.DestroyPage(&client, &handle2); + pool.DestroyPage(&client, &handle3); + pool.DeregisterClient(&client); +} +} + +IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc new file mode 100644 index 0000000..1960bb0 --- /dev/null +++ b/be/src/runtime/bufferpool/buffer-pool.cc @@ -0,0 +1,439 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/bufferpool/buffer-pool.h" + +#include <boost/bind.hpp> +#include <limits> +#include <sstream> + +#include "runtime/bufferpool/reservation-tracker.h" +#include "common/names.h" +#include "gutil/strings/substitute.h" +#include "util/bit-util.h" +#include "util/runtime-profile-counters.h" +#include "util/uid-util.h" + +namespace impala { + +/// The internal representation of a page, which can be pinned or unpinned. If the +/// page is pinned, a buffer is associated with the page. +/// +/// Code manipulating the page is responsible for acquiring 'lock' when reading or +/// modifying the page. +struct BufferPool::Page : public BufferPool::PageList::Node { + Page(int64_t len) : len(len), pin_count(0), dirty(false) {} + + /// Increment the pin count. Caller must hold 'lock'. + void IncrementPinCount(PageHandle* handle) { + lock.DCheckLocked(); + ++pin_count; + // Pinned page buffers may be modified by anyone with a pointer to the buffer, so we + // have to assume they are dirty. + dirty = true; + } + + /// Decrement the pin count. Caller must hold 'lock'. + void DecrementPinCount(PageHandle* handle) { + lock.DCheckLocked(); + DCHECK(pin_count > 0); + --pin_count; + } + + string DebugString() { + return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 dirty: $4", this, + len, pin_count, buffer.DebugString(), dirty); + } + + // Helper for BufferPool::DebugString(). + static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) { + lock_guard<SpinLock> pl(page->lock); + (*ss) << page->DebugString() << "\n"; + return true; + } + + /// The length of the page in bytes. + const int64_t len; + + /// Lock to protect the below members of Page. The lock must be held when modifying any + /// of the below members and when reading any of the below members of an unpinned page. + SpinLock lock; + + /// The pin count of the page. + int pin_count; + + /// Buffer with the page's contents, Always open if pinned. Closed if page is unpinned + /// and was evicted from memory. + BufferHandle buffer; + + /// True if the buffer's contents need to be saved before evicting it from memory. + bool dirty; +}; + +BufferPool::BufferHandle::BufferHandle() { + Reset(); +} + +BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) { + *this = std::move(src); +} + +BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& src) { + DCHECK(!is_open()); + // Copy over all members then close src. + client_ = src.client_; + data_ = src.data_; + len_ = src.len_; + src.Reset(); + return *this; +} + +void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, int64_t len) { + client_ = client; + data_ = data; + len_ = len; +} + +void BufferPool::BufferHandle::Reset() { + client_ = NULL; + data_ = NULL; + len_ = -1; +} + +BufferPool::PageHandle::PageHandle() { + Reset(); +} + +BufferPool::PageHandle::PageHandle(PageHandle&& src) { + *this = std::move(src); +} + +BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) { + DCHECK(!is_open()); + // Copy over all members then close src. + page_ = src.page_; + client_ = src.client_; + src.Reset(); + return *this; +} + +void BufferPool::PageHandle::Open(Page* page, Client* client) { + DCHECK(!is_open()); + page->lock.DCheckLocked(); + page_ = page; + client_ = client; +} + +void BufferPool::PageHandle::Reset() { + page_ = NULL; + client_ = NULL; +} + +int BufferPool::PageHandle::pin_count() const { + DCHECK(is_open()); + // The pin count can only be modified via this PageHandle, which must not be + // concurrently accessed by multiple threads, so it is safe to access without locking + return page_->pin_count; +} + +int64_t BufferPool::PageHandle::len() const { + DCHECK(is_open()); + // The length of the page cannot change, so it is safe to access without locking. + return page_->len; +} + +const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const { + DCHECK(is_pinned()); + // The 'buffer' field cannot change while the page is pinned, so it is safe to access + // without locking. + return &page_->buffer; +} + +BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit) + : allocator_(new BufferAllocator(min_buffer_len)), + min_buffer_len_(min_buffer_len), + buffer_bytes_limit_(buffer_bytes_limit), + buffer_bytes_remaining_(buffer_bytes_limit) { + DCHECK_GT(min_buffer_len, 0); + DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len)); +} + +BufferPool::~BufferPool() { + DCHECK(pages_.empty()); +} + +Status BufferPool::RegisterClient( + const string& name, ReservationTracker* reservation, RuntimeProfile* profile, + Client* client) { + DCHECK(!client->is_registered()); + DCHECK(reservation != NULL); + client->InitCounters(profile); + client->reservation_ = reservation; + client->name_ = name; + return Status::OK(); +} + +void BufferPool::Client::InitCounters(RuntimeProfile* profile) { + counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime"); + counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadWaitTime"); + counters_.write_wait_time = ADD_TIMER(profile, "BufferPoolWriteWaitTime"); + counters_.peak_unpinned_bytes = + profile->AddHighWaterMarkCounter("BufferPoolPeakUnpinnedBytes", TUnit::BYTES); + counters_.total_unpinned_bytes = + ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES); +} + +void BufferPool::DeregisterClient(Client* client) { + if (!client->is_registered()) return; + client->reservation_->Close(); + client->name_.clear(); + client->reservation_ = NULL; +} + +Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) { + DCHECK(!handle->is_open()); + DCHECK_GE(len, min_buffer_len_); + DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); + + BufferHandle buffer; + // No changes have been made to state yet, so we can cleanly return on error. + RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer)); + + Page* page = new Page(len); + { + lock_guard<SpinLock> pl(page->lock); + page->buffer = std::move(buffer); + handle->Open(page, client); + page->IncrementPinCount(handle); + } + + // Only add to globally-visible list after page is initialized. The page lock also + // needs to be released before enqueueing to respect the lock ordering. + pages_.Enqueue(page); + + client->reservation_->AllocateFrom(len); + return Status::OK(); +} + +void BufferPool::DestroyPage(Client* client, PageHandle* handle) { + if (!handle->is_open()) return; // DestroyPage() should be idempotent. + + Page* page = handle->page_; + if (handle->is_pinned()) { + // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work + // of cleaning up the page and freeing the buffer. + BufferHandle buffer; + ExtractBuffer(client, handle, &buffer); + FreeBuffer(client, &buffer); + return; + } + + { + lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. + // In the unpinned case, no reservation is consumed, so just free the buffer. + // TODO: wait for in-flight writes for 'page' so we can safely free 'page'. + if (page->buffer.is_open()) FreeBufferInternal(&page->buffer); + } + CleanUpPage(handle); +} + +void BufferPool::CleanUpPage(PageHandle* handle) { + // Remove the destroyed page from data structures in a way that ensures no other + // threads have a remaining reference. Threads that access pages via the 'pages_' + // list hold 'pages_.lock_', so Remove() will not return until those threads are done + // and it is safe to delete page. + pages_.Remove(handle->page_); + delete handle->page_; + handle->Reset(); +} + +Status BufferPool::Pin(Client* client, PageHandle* handle) { + DCHECK(client->is_registered()); + DCHECK(handle->is_open()); + DCHECK_EQ(handle->client_, client); + + Page* page = handle->page_; + { + lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. + if (page->pin_count == 0) { + if (!page->buffer.is_open()) { + // No changes have been made to state yet, so we can cleanly return on error. + RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, &page->buffer)); + + // TODO: will need to initiate/wait for read if the page is not in-memory. + } + COUNTER_ADD(client->counters_.peak_unpinned_bytes, -handle->len()); + } + page->IncrementPinCount(handle); + } + + client->reservation_->AllocateFrom(page->len); + return Status::OK(); +} + +void BufferPool::Unpin(Client* client, PageHandle* handle) { + DCHECK(handle->is_open()); + lock_guard<SpinLock> pl(handle->page_->lock); + UnpinLocked(client, handle); +} + +void BufferPool::UnpinLocked(Client* client, PageHandle* handle) { + DCHECK(client->is_registered()); + DCHECK_EQ(handle->client_, client); + // If handle is pinned, we can assume that the page itself is pinned. + DCHECK(handle->is_pinned()); + Page* page = handle->page_; + page->lock.DCheckLocked(); + + page->DecrementPinCount(handle); + client->reservation_->ReleaseTo(page->len); + + COUNTER_ADD(client->counters_.total_unpinned_bytes, handle->len()); + COUNTER_ADD(client->counters_.peak_unpinned_bytes, handle->len()); + + // TODO: can evict now. Only need to preserve contents if 'page->dirty' is true. +} + +void BufferPool::ExtractBuffer( + Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) { + DCHECK(page_handle->is_pinned()); + + DCHECK_EQ(page_handle->client_, client); + + Page* page = page_handle->page_; + { + lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. + // TODO: wait for in-flight writes for 'page' so we can safely free 'page'. + + // Bring the pin count to 1 so that we're not using surplus reservations. + while (page->pin_count > 1) UnpinLocked(client, page_handle); + *buffer_handle = std::move(page->buffer); + } + CleanUpPage(page_handle); +} + +Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* handle) { + client->reservation_->AllocateFrom(len); + return AllocateBufferInternal(client, len, handle); +} + +Status BufferPool::AllocateBufferInternal( + Client* client, int64_t len, BufferHandle* buffer) { + DCHECK(!buffer->is_open()); + DCHECK_GE(len, min_buffer_len_); + DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); + SCOPED_TIMER(client->counters_.get_buffer_time); + + // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer. + if (TryDecreaseBufferBytesRemaining(len)) { + uint8_t* data; + Status status = allocator_->Allocate(len, &data); + if (!status.ok()) { + buffer_bytes_remaining_.Add(len); + return status; + } + DCHECK(data != NULL); + buffer->Open(client, data, len); + return Status::OK(); + } + + // If there is no remaining capacity, we must evict another page. + return Status(TErrorCode::NOT_IMPLEMENTED_ERROR, + Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page eviction is " + "not implemented yet!", buffer_bytes_limit_)); +} + +void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) { + if (!handle->is_open()) return; // Should be idempotent. + DCHECK_EQ(client, handle->client_); + client->reservation_->ReleaseTo(handle->len_); + FreeBufferInternal(handle); +} + +void BufferPool::FreeBufferInternal(BufferHandle* handle) { + DCHECK(handle->is_open()); + allocator_->Free(handle->data(), handle->len()); + buffer_bytes_remaining_.Add(handle->len()); + handle->Reset(); +} + +Status BufferPool::TransferBuffer( + Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst) { + DCHECK(src->is_open()); + DCHECK(!dst->is_open()); + DCHECK_EQ(src_client, src->client_); + DCHECK_NE(src, dst); + DCHECK_NE(src_client, dst_client); + + dst_client->reservation_->AllocateFrom(src->len()); + src_client->reservation_->ReleaseTo(src->len()); + *dst = std::move(*src); + dst->client_ = dst_client; + return Status::OK(); +} + +bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) { + // TODO: we may want to change this policy so that we don't always use up to the limit + // for buffers, since this may starve other operators using non-buffer-pool memory. + while (true) { + int64_t old_value = buffer_bytes_remaining_.Load(); + if (old_value < len) return false; + int64_t new_value = old_value - len; + if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) { + return true; + } + } +} + +string BufferPool::Client::DebugString() const { + if (is_registered()) { + return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", this, name_, + reservation_->DebugString()); + } else { + return Substitute("<BufferPool::Client> $0 UNREGISTERED", this); + } +} + +string BufferPool::PageHandle::DebugString() const { + if (is_open()) { + lock_guard<SpinLock> pl(page_->lock); + return Substitute( + "<BufferPool::PageHandle> $0 client: {$1} page: {$2}", + this, client_->DebugString(), page_->DebugString()); + } else { + return Substitute("<BufferPool::PageHandle> $0 CLOSED", this); + } +} + +string BufferPool::BufferHandle::DebugString() const { + if (is_open()) { + return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 len: $3", this, + client_->DebugString(), data_, len_); + } else { + return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this); + } +} + +string BufferPool::DebugString() { + stringstream ss; + ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_ + << " buffer_bytes_limit: " << buffer_bytes_limit_ + << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n"; + pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1)); + return ss.str(); +} +}
