Repository: incubator-impala
Updated Branches:
  refs/heads/master a16a0fa84 -> 5ec1984f5


IMPALA-5238: transfer reservations between trackers

This is a primitive needed to implement claiming and distribution
of initial reservations. It supports transferring reservation between
any two ReservationTrackers under the same query.

Also remove the public DecreaseReservation() method, which is now
mostly redundant and we have no plans to use.

Testing:
* Added a test that exercises transfer between trackers at different
  levels and with different relationships.

Change-Id: I21f008abaf1aa4fcd2d854769a603b97589af3b3
Reviewed-on: http://gerrit.cloudera.org:8080/6708
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5ec1984f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5ec1984f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5ec1984f

Branch: refs/heads/master
Commit: 5ec1984f56a2010fb9ca98bc58ca0c9ef83b2391
Parents: a16a0fa
Author: Tim Armstrong <[email protected]>
Authored: Wed Apr 19 16:49:11 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri May 12 00:46:55 2017 +0000

----------------------------------------------------------------------
 .../bufferpool/reservation-tracker-test.cc      | 141 +++++++++++++---
 .../runtime/bufferpool/reservation-tracker.cc   | 164 ++++++++++++++-----
 be/src/runtime/bufferpool/reservation-tracker.h |  66 ++++++--
 3 files changed, 296 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5ec1984f/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc 
b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index 416a53e..a794a91 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -141,13 +141,7 @@ TEST_F(ReservationTrackerTest, BasicTwoLevel) {
   child.ReleaseTo(1);
   ASSERT_EQ(0, child.GetUsedReservation());
 
-  // Child reservation should be returned all the way up the tree.
-  child.DecreaseReservation(1);
-  ASSERT_EQ(child_reservation, root_.GetReservation());
-  ASSERT_EQ(child_reservation - 1, child.GetReservation());
-  ASSERT_EQ(child_reservation - 1, root_.GetChildReservations());
-
-  // Closing the child should release its reservation.
+  // Closing the child should release its reservation all the way up the tree.
   child.Close();
   ASSERT_EQ(1, root_.GetReservation());
   ASSERT_EQ(0, root_.GetChildReservations());
@@ -284,15 +278,12 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationTwoLevel) {
   ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
 
   // Check that released memory is decremented from all trackers correctly.
-  child_reservations1.DecreaseReservation(MIN_BUFFER_LEN);
-  child_reservations2.DecreaseReservation(MIN_BUFFER_LEN);
-  ASSERT_EQ(0, child_reservations2.GetReservation());
+  child_reservations1.Close();
+  child_reservations2.Close();
+  ASSERT_EQ(0, child_mem_tracker1.consumption());
   ASSERT_EQ(0, child_mem_tracker2.consumption());
   ASSERT_EQ(0, root_mem_tracker.consumption());
   ASSERT_EQ(0, root_.GetUsedReservation());
-
-  child_reservations1.Close();
-  child_reservations2.Close();
   child_mem_tracker1.UnregisterFromParent();
   child_mem_tracker2.UnregisterFromParent();
 }
@@ -315,8 +306,6 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationMultiLevel) {
   for (int i = 1; i < HIERARCHY_DEPTH; ++i) {
     mem_trackers[i].reset(new MemTracker(
         mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
-    reservations[i].InitChildTracker(
-        NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500);
   }
 
   vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
@@ -326,6 +315,10 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationMultiLevel) {
   for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
     int64_t lowest_limit = mem_trackers[level]->lowest_limit();
     for (int amount : interesting_amounts) {
+      // Initialize the tracker, increase reservation, then release 
reservation by closing
+      // the tracker.
+      reservations[level].InitChildTracker(
+          NewProfile(), &reservations[level - 1], mem_trackers[level].get(), 
500);
       bool increased = reservations[level].IncreaseReservation(amount);
       if (lowest_limit == -1 || amount <= lowest_limit) {
         // The increase should go through.
@@ -338,12 +331,12 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationMultiLevel) {
         }
 
         LOG(INFO) << "\n" << mem_trackers[0]->LogUsage();
-        reservations[level].DecreaseReservation(amount);
+        reservations[level].Close();
       } else {
         ASSERT_FALSE(increased);
       }
-      // We should be back in the original state.
-      for (int i = 0; i < HIERARCHY_DEPTH; ++i) {
+      // Reservations should be released on all ancestors.
+      for (int i = 0; i < level; ++i) {
         ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
                                                        << 
reservations[i].DebugString();
         ASSERT_EQ(0, reservations[i].GetChildReservations());
@@ -365,7 +358,8 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationMultiLevel) {
       ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
       ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
     }
-    reservations[level].DecreaseReservation(amount);
+    // Return the reservation to the root before the next iteration.
+    reservations[level].TransferReservationTo(&reservations[0], amount);
   }
 
   for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) {
@@ -373,6 +367,115 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationMultiLevel) {
     if (i != 0) mem_trackers[i]->UnregisterFromParent();
   }
 }
+
+// Test TransferReservation().
+TEST_F(ReservationTrackerTest, TransferReservation) {
+  Status status;
+  // Set up this hierarchy, to test transfers between different levels and
+  // different cases:
+  //    (root) limit = 4
+  //      ^
+  //      |
+  //  (grandparent) limit = 3
+  //   ^         ^
+  //   |         |
+  //  (parent) (aunt) limit =2
+  //   ^
+  //   |
+  //  (child)
+  const int64_t TOTAL_MEM = MIN_BUFFER_LEN * 4;
+  const int64_t GRANDPARENT_LIMIT = MIN_BUFFER_LEN * 3;
+  const int64_t AUNT_LIMIT = MIN_BUFFER_LEN * 2;
+
+  root_.InitRootTracker(nullptr, TOTAL_MEM);
+  MemTracker* root_mem_tracker = obj_pool_.Add(new MemTracker);
+  ReservationTracker* grandparent = obj_pool_.Add(new ReservationTracker());
+  MemTracker* grandparent_mem_tracker =
+      obj_pool_.Add(new MemTracker(TOTAL_MEM, "grandparent", 
root_mem_tracker));
+  ReservationTracker* parent = obj_pool_.Add(new ReservationTracker());
+  MemTracker* parent_mem_tracker =
+      obj_pool_.Add(new MemTracker(-1, "parent", grandparent_mem_tracker));
+  ReservationTracker* aunt = obj_pool_.Add(new ReservationTracker());
+  ReservationTracker* child = obj_pool_.Add(new ReservationTracker());
+  MemTracker* child_mem_tracker =
+      obj_pool_.Add(new MemTracker(-1, "child", parent_mem_tracker));
+  grandparent->InitChildTracker(nullptr, &root_, grandparent_mem_tracker, 
TOTAL_MEM);
+  parent->InitChildTracker(
+      nullptr, grandparent, parent_mem_tracker, 
numeric_limits<int64_t>::max());
+  aunt->InitChildTracker(nullptr, grandparent, nullptr, AUNT_LIMIT);
+  child->InitChildTracker(
+      nullptr, parent, child_mem_tracker, numeric_limits<int64_t>::max());
+
+  ASSERT_TRUE(child->IncreaseReservation(GRANDPARENT_LIMIT));
+  // Transfer from child to self (no-op).
+  ASSERT_TRUE(child->TransferReservationTo(child, GRANDPARENT_LIMIT));
+  EXPECT_EQ(GRANDPARENT_LIMIT, child->GetReservation());
+
+  // Transfer from child to parent.
+  ASSERT_TRUE(child->TransferReservationTo(parent, GRANDPARENT_LIMIT));
+  EXPECT_EQ(0, child->GetReservation());
+  EXPECT_EQ(0, child_mem_tracker->consumption());
+  EXPECT_EQ(0, parent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, parent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, parent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, root_.GetReservation());
+
+  // Transfer from parent to aunt, up to aunt's limit.
+  ASSERT_TRUE(parent->TransferReservationTo(aunt, AUNT_LIMIT));
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, parent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, parent_mem_tracker->consumption());
+  EXPECT_EQ(AUNT_LIMIT, aunt->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, root_.GetReservation());
+  // Cannot exceed aunt's limit by transferring.
+  ASSERT_FALSE(parent->TransferReservationTo(aunt, parent->GetReservation()));
+
+  // Transfer from parent to child.
+  ASSERT_TRUE(parent->TransferReservationTo(child, parent->GetReservation()));
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, child->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, child_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, parent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT - AUNT_LIMIT, parent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+
+  // Transfer from aunt to child.
+  ASSERT_TRUE(aunt->TransferReservationTo(child, AUNT_LIMIT));
+  EXPECT_EQ(GRANDPARENT_LIMIT, child->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, child_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, parent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, parent_mem_tracker->consumption());
+  EXPECT_EQ(0, aunt->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+
+  // Transfer from child to grandparent.
+  ASSERT_TRUE(child->TransferReservationTo(grandparent, GRANDPARENT_LIMIT));
+  EXPECT_EQ(0, child->GetReservation());
+  EXPECT_EQ(0, child_mem_tracker->consumption());
+  EXPECT_EQ(0, parent->GetReservation());
+  EXPECT_EQ(0, parent_mem_tracker->consumption());
+  EXPECT_EQ(0, aunt->GetReservation());
+  EXPECT_EQ(0, grandparent->GetChildReservations());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent->GetReservation());
+  EXPECT_EQ(GRANDPARENT_LIMIT, grandparent_mem_tracker->consumption());
+  EXPECT_EQ(GRANDPARENT_LIMIT, root_.GetReservation());
+
+  child->Close();
+  child_mem_tracker->UnregisterFromParent();
+  aunt->Close();
+  parent->Close();
+  parent_mem_tracker->UnregisterFromParent();
+  grandparent->Close();
+  grandparent_mem_tracker->UnregisterFromParent();
+}
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5ec1984f/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc 
b/be/src/runtime/bufferpool/reservation-tracker.cc
index 7138d69..13cd67c 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -29,7 +29,7 @@
 
 namespace impala {
 
-ReservationTracker::ReservationTracker() : initialized_(false), 
mem_tracker_(NULL) {}
+ReservationTracker::ReservationTracker() : initialized_(false), 
mem_tracker_(nullptr) {}
 
 ReservationTracker::~ReservationTracker() {
   DCHECK(!initialized_);
@@ -39,8 +39,8 @@ void ReservationTracker::InitRootTracker(
     RuntimeProfile* profile, int64_t reservation_limit) {
   lock_guard<SpinLock> l(lock_);
   DCHECK(!initialized_);
-  parent_ = NULL;
-  mem_tracker_ = NULL;
+  parent_ = nullptr;
+  mem_tracker_ = nullptr;
   reservation_limit_ = reservation_limit;
   reservation_ = 0;
   used_reservation_ = 0;
@@ -55,7 +55,7 @@ void ReservationTracker::InitRootTracker(
 
 void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
     ReservationTracker* parent, MemTracker* mem_tracker, int64_t 
reservation_limit) {
-  DCHECK(parent != NULL);
+  DCHECK(parent != nullptr);
   DCHECK_GE(reservation_limit, 0);
 
   lock_guard<SpinLock> l(lock_);
@@ -69,9 +69,9 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* 
profile,
   child_reservations_ = 0;
   initialized_ = true;
 
-  if (mem_tracker_ != NULL) {
+  if (mem_tracker_ != nullptr) {
     MemTracker* parent_mem_tracker = GetParentMemTracker();
-    if (parent_mem_tracker != NULL) {
+    if (parent_mem_tracker != nullptr) {
       // Make sure the parent links of the MemTrackers correspond to our 
parent links.
       DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
       // Make sure we don't have a lower limit than the ancestor, since we 
don't enforce
@@ -81,8 +81,8 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* 
profile,
       // Make sure we didn't leave a gap in the links. E.g. this tracker's 
grandparent
       // shouldn't have a MemTracker.
       ReservationTracker* ancestor = parent_;
-      while (ancestor != NULL) {
-        DCHECK(ancestor->mem_tracker_ == NULL);
+      while (ancestor != nullptr) {
+        DCHECK(ancestor->mem_tracker_ == nullptr);
         ancestor = ancestor->parent_;
       }
     }
@@ -95,14 +95,13 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* 
profile,
 
 void ReservationTracker::InitCounters(
     RuntimeProfile* profile, int64_t reservation_limit) {
-  bool profile_provided = profile != NULL;
-  if (profile == NULL) {
+  if (profile == nullptr) {
     dummy_profile_.reset(new DummyProfile);
     profile = dummy_profile_->profile();
   }
 
   // Check that another tracker's counters aren't already registered in the 
profile.
-  DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL);
+  DCHECK(profile->GetCounter("BufferPoolInitialReservation") == nullptr);
   counters_.reservation_limit =
       ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES);
   counters_.peak_reservation =
@@ -112,9 +111,7 @@ void ReservationTracker::InitCounters(
 
   COUNTER_SET(counters_.reservation_limit, reservation_limit);
 
-  if (mem_tracker_ != NULL && profile_provided) {
-    mem_tracker_->EnableReservationReporting(counters_);
-  }
+  if (mem_tracker_ != nullptr) 
mem_tracker_->EnableReservationReporting(counters_);
 }
 
 void ReservationTracker::Close() {
@@ -124,9 +121,9 @@ void ReservationTracker::Close() {
   DCHECK_EQ(used_reservation_, 0);
   DCHECK_EQ(child_reservations_, 0);
   // Release any reservation to parent.
-  if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false);
-  mem_tracker_ = NULL;
-  parent_ = NULL;
+  if (parent_ != nullptr) DecreaseReservationLocked(reservation_, false);
+  mem_tracker_ = nullptr;
+  parent_ = nullptr;
   initialized_ = false;
 }
 
@@ -154,17 +151,17 @@ bool 
ReservationTracker::IncreaseReservationInternalLocked(
   } else if (reservation_increase == 0) {
     granted = true;
   } else {
-    if (parent_ == NULL) {
+    if (parent_ == nullptr) {
       granted = true;
     } else {
       lock_guard<SpinLock> l(parent_->lock_);
       granted =
           parent_->IncreaseReservationInternalLocked(reservation_increase, 
true, true);
     }
-    if (granted && !TryUpdateMemTracker(reservation_increase)) {
+    if (granted && !TryConsumeFromMemTracker(reservation_increase)) {
       granted = false;
       // Roll back changes to ancestors if MemTracker update fails.
-      parent_->DecreaseReservationInternal(reservation_increase, true);
+      parent_->DecreaseReservation(reservation_increase, true);
     }
   }
 
@@ -179,9 +176,10 @@ bool ReservationTracker::IncreaseReservationInternalLocked(
   return granted;
 }
 
-bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) {
-  if (mem_tracker_ == NULL) return true;
-  if (GetParentMemTracker() == NULL) {
+bool ReservationTracker::TryConsumeFromMemTracker(int64_t 
reservation_increase) {
+  DCHECK_GE(reservation_increase, 0);
+  if (mem_tracker_ == nullptr) return true;
+  if (GetParentMemTracker() == nullptr) {
     // At the topmost link, which may be a MemTracker with a limit, we need to 
use
     // TryConsume() to check the limit.
     return mem_tracker_->TryConsume(reservation_increase);
@@ -194,35 +192,125 @@ bool ReservationTracker::TryUpdateMemTracker(int64_t 
reservation_increase) {
   }
 }
 
-void ReservationTracker::DecreaseReservation(int64_t bytes) {
-  DecreaseReservationInternal(bytes, false);
+void ReservationTracker::ReleaseToMemTracker(int64_t reservation_decrease) {
+  DCHECK_GE(reservation_decrease, 0);
+  if (mem_tracker_ == nullptr) return;
+  if (GetParentMemTracker() == nullptr) {
+    mem_tracker_->Release(reservation_decrease);
+  } else {
+    mem_tracker_->ReleaseLocal(reservation_decrease, GetParentMemTracker());
+  }
 }
 
-void ReservationTracker::DecreaseReservationInternal(
-    int64_t bytes, bool is_child_reservation) {
+void ReservationTracker::DecreaseReservation(int64_t bytes, bool 
is_child_reservation) {
   lock_guard<SpinLock> l(lock_);
-  DecreaseReservationInternalLocked(bytes, is_child_reservation);
+  DecreaseReservationLocked(bytes, is_child_reservation);
 }
 
-void ReservationTracker::DecreaseReservationInternalLocked(
+void ReservationTracker::DecreaseReservationLocked(
     int64_t bytes, bool is_child_reservation) {
   DCHECK(initialized_);
   DCHECK_GE(reservation_, bytes);
   if (bytes == 0) return;
   if (is_child_reservation) child_reservations_ -= bytes;
   UpdateReservation(-bytes);
+  ReleaseToMemTracker(bytes);
   // The reservation should be returned up the tree.
-  if (mem_tracker_ != NULL) {
-    if (GetParentMemTracker() == NULL) {
-      mem_tracker_->Release(bytes);
-    } else {
-      mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker());
-    }
-  }
-  if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true);
+  if (parent_ != nullptr) parent_->DecreaseReservation(bytes, true);
   CheckConsistency();
 }
 
+bool ReservationTracker::TransferReservationTo(ReservationTracker* other, 
int64_t bytes) {
+  if (other == this) return true;
+  // Find the path to the root from both. The root is guaranteed to be a 
common ancestor.
+  vector<ReservationTracker*> path_to_common = FindPathToRoot();
+  vector<ReservationTracker*> other_path_to_common = other->FindPathToRoot();
+  DCHECK_EQ(path_to_common.back(), other_path_to_common.back());
+  ReservationTracker* common_ancestor = path_to_common.back();
+  // Remove any common ancestors - they do not need to be updated for this 
transfer.
+  while (!path_to_common.empty() && !other_path_to_common.empty()
+      && path_to_common.back() == other_path_to_common.back()) {
+    common_ancestor = path_to_common.back();
+    path_to_common.pop_back();
+    other_path_to_common.pop_back();
+  }
+
+  // At this point, we have three cases:
+  // 1. 'common_ancestor' == 'other'. 'other_path_to_common' is empty because 
'other' is
+  //    the lowest common ancestor. To transfer, we decrease the reservation 
on the
+  //    trackers under 'other', down to 'this'.
+  // 2. 'common_ancestor' == 'this'. 'path_to_common' is empty because 'this' 
is the
+  //    lowest common ancestor. To transfer, we increase the reservation on 
the trackers
+  //    under 'this', down to 'other'.
+  // 3. Neither is an ancestor of the other. Both 'other_path_to_common' and
+  //    'path_to_common' are non-empty. We increase the reservation on 
trackers from
+  //    'other' up to one below the common ancestor (checking limits as 
needed) and if
+  //    successful, decrease reservations on trackers from 'this' up to one 
below the
+  //    common ancestor.
+
+  // Lock all of the trackers so we can do the update atomically. Need to be 
careful to
+  // lock subtrees in the correct order.
+  vector<unique_lock<SpinLock>> locks;
+  bool lock_first = path_to_common.empty() || other_path_to_common.empty()
+      || lock_sibling_subtree_first(path_to_common.back(), 
other_path_to_common.back());
+  if (lock_first) {
+    for (ReservationTracker* tracker : path_to_common) 
locks.emplace_back(tracker->lock_);
+  }
+  for (ReservationTracker* tracker : other_path_to_common) {
+    locks.emplace_back(tracker->lock_);
+  }
+  if (!lock_first) {
+    for (ReservationTracker* tracker : path_to_common) 
locks.emplace_back(tracker->lock_);
+  }
+
+  // Check reservation limits will not be violated before applying any updates.
+  for (ReservationTracker* tracker : other_path_to_common) {
+    if (tracker->reservation_ + bytes > tracker->reservation_limit_) return 
false;
+  }
+
+  // Do the updates now that we have checked the limits. We're holding all the 
locks
+  // so this is all atomic.
+  for (ReservationTracker* tracker : other_path_to_common) {
+    tracker->UpdateReservation(bytes);
+    // We don't handle MemTrackers with limit in this function - this should 
always
+    // succeed.
+    DCHECK(tracker->mem_tracker_ == nullptr || 
!tracker->mem_tracker_->has_limit());
+    bool success = tracker->TryConsumeFromMemTracker(bytes);
+    DCHECK(success);
+    if (tracker != other_path_to_common[0]) tracker->child_reservations_ += 
bytes;
+  }
+  for (ReservationTracker* tracker : path_to_common) {
+    if (tracker != path_to_common[0]) tracker->child_reservations_ -= bytes;
+    tracker->UpdateReservation(-bytes);
+    tracker->ReleaseToMemTracker(bytes);
+  }
+
+  // Update the 'child_reservations_' on the common ancestor if needed.
+  // Case 1: reservation was pushed up to 'other'.
+  if (common_ancestor == other) {
+    lock_guard<SpinLock> l(other->lock_);
+    other->child_reservations_ -= bytes;
+    other->CheckConsistency();
+  }
+  // Case 2: reservation was pushed down below 'this'.
+  if (common_ancestor == this) {
+    lock_guard<SpinLock> l(lock_);
+    child_reservations_ += bytes;
+    CheckConsistency();
+  }
+  return true;
+}
+
+vector<ReservationTracker*> ReservationTracker::FindPathToRoot() {
+  vector<ReservationTracker*> path_to_root;
+  ReservationTracker* curr = this;
+  do {
+    path_to_root.push_back(curr);
+    curr = curr->parent_;
+  } while (curr != nullptr);
+  return path_to_root;
+}
+
 void ReservationTracker::AllocateFrom(int64_t bytes) {
   lock_guard<SpinLock> l(lock_);
   DCHECK(initialized_);
@@ -296,7 +384,7 @@ string ReservationTracker::DebugString() {
   lock_guard<SpinLock> l(lock_);
   if (!initialized_) return "<ReservationTracker>: uninitialized";
 
-  string parent_debug_string = parent_ == NULL ? "NULL" : 
parent_->DebugString();
+  string parent_debug_string = parent_ == nullptr ? "NULL" : 
parent_->DebugString();
   return Substitute(
       "<ReservationTracker>: reservation_limit $0 reservation $1 
used_reservation $2 "
       "child_reservations $3 parent:\n$4",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5ec1984f/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h 
b/be/src/runtime/bufferpool/reservation-tracker.h
index 3d40fcc..5be2c23 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -127,11 +127,14 @@ class ReservationTracker {
   /// Returns true if the reservation increase was successful or not necessary.
   bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
 
-  /// Decrease tracker's reservation by 'bytes'. This tracker's reservation 
must be at
-  /// least 'bytes' before calling this method.
-  /// TODO: decide on and implement policy for how far to release the 
reservation up
-  /// the tree. Currently the reservation is released all the way to the root.
-  void DecreaseReservation(int64_t bytes);
+  /// Transfer reservation from this tracker to 'other'. Both trackers must be 
in the
+  /// same query subtree of the hierarchy. One tracker can be the ancestor of 
the other,
+  /// or they can share a common ancestor. The subtree root must be at the 
query level
+  /// or below so that the transfer cannot cause a MemTracker limit to be 
exceeded
+  /// (because linked MemTrackers with limits below the query level are not 
supported).
+  /// Returns true on success or false if the transfer would have caused a 
reservation
+  /// limit to be exceeded.
+  bool TransferReservationTo(ReservationTracker* other, int64_t bytes);
 
   /// Allocate 'bytes' from the reservation. The tracker must have at least 
'bytes'
   /// unused reservation before calling this method.
@@ -181,19 +184,38 @@ class ReservationTracker {
   bool IncreaseReservationInternalLocked(
       int64_t bytes, bool use_existing_reservation, bool is_child_reservation);
 
-  /// Update consumption on linked MemTracker. For the topmost link, return 
false if
-  /// this failed because it would exceed a memory limit. If there is no linked
-  /// MemTracker, just returns true.
+  /// Increase consumption on linked MemTracker to reflect an increase in 
reservation
+  /// of 'reservation_increase'. For the topmost link, return false if this 
failed
+  /// because it would exceed a memory limit. If there is no linked 
MemTracker, just
+  /// returns true.
   /// TODO: remove once we account all memory via ReservationTrackers.
-  bool TryUpdateMemTracker(int64_t reservation_increase);
-
-  /// Internal helper for DecreaseReservation(). This behaves the same as
-  /// DecreaseReservation(), except when 'is_child_reservation' is true it 
decreases
-  /// 'child_reservations_' by 'bytes'.
-  void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation);
+  bool TryConsumeFromMemTracker(int64_t reservation_increase);
 
-  /// Same as DecreaseReservationInternal(), but 'lock_' must be held by 
caller.
-  void DecreaseReservationInternalLocked(int64_t bytes, bool 
is_child_reservation);
+  /// Decrease consumption on linked MemTracker to reflect a decrease in 
reservation of
+  /// 'reservation_decrease'. If there is no linked MemTracker, does nothing.
+  /// TODO: remove once we account all memory via ReservationTrackers.
+  void ReleaseToMemTracker(int64_t reservation_decrease);
+
+  /// Decrease reservation by 'bytes' on this tracker and all ancestors. This 
tracker's
+  /// reservation must be at least 'bytes' before calling this method. If
+  /// 'is_child_reservation' is true it decreases 'child_reservations_' by 
'bytes'
+  void DecreaseReservation(int64_t bytes, bool is_child_reservation);
+
+  /// Same as DecreaseReservation(), but 'lock_' must be held by caller.
+  void DecreaseReservationLocked(int64_t bytes, bool is_child_reservation);
+
+  /// Return a vector containing the trackers on the path to the root tracker. 
Includes
+  /// the current tracker and the root tracker.
+  std::vector<ReservationTracker*> FindPathToRoot();
+
+  /// Return true if trackers in the subtree rooted at 'subtree1' precede 
trackers in
+  /// the subtree rooted at 'subtree2' in the lock order. 'subtree1' and 
'subtree2'
+  /// must share the same parent.
+  static bool lock_sibling_subtree_first(
+      ReservationTracker* subtree1, ReservationTracker* subtree2) {
+    DCHECK_EQ(subtree1->parent_, subtree2->parent_);
+    return reinterpret_cast<uintptr_t>(subtree1) < 
reinterpret_cast<uintptr_t>(subtree2);
+  }
 
   /// Check the internal consistency of the ReservationTracker and DCHECKs if 
in an
   /// inconsistent state.
@@ -208,8 +230,16 @@ class ReservationTracker {
   /// 'lock_' must be held by caller.
   void UpdateReservation(int64_t delta);
 
-  /// lock_ protects all members. In a hierarchy of trackers, locks must be 
acquired
-  /// from the bottom-up.
+  /// lock_ protects all members. The lock order in a tree of 
ReservationTrackers is
+  /// based on a post-order traversal of the tree, with children visited in 
order of the
+  /// memory address of the ReservationTracker object. The following rules can 
be applied
+  /// to determine the relative positions of two trackers t1 and t2 in the 
lock order:
+  /// * If t1 is a descendent of t2, t1's lock must be acquired before t2's 
lock (i.e.
+  ///   locks are acquired bottom-up).
+  /// * If neither t1 or t2 is a descendant of the other, they must be in 
subtrees of
+  ///   under a common ancestor. If the memory address of t1's subtree's root 
is less
+  ///   than the memory address of t2's subtree's root, t1's lock must be 
acquired before
+  ///   t2's lock. This check is implemented in lock_sibling_subtree_first().
   SpinLock lock_;
 
   /// True if the tracker is initialized.

Reply via email to