Repository: impala
Updated Branches:
  refs/heads/master b0d3433e3 -> 19ab465b3


IMPALA-6519: API to allocate unreserved buffer

The motivation is to allow allocation of buffers without reservation in
ExchangeNode. Currently this it not possible because
IncreaseReservationToFit() followed by AllocateBuffer() is non-atomic.
We need to handle concurrent allocations in ExchangeNode because there
may be multiple batches being received at a given time.

This is a temporary solution until we can implement proper reservations
in ExchangeNode (IMPALA-6524).

Testing:
Added basic unit test.

Change-Id: Ia4d17b3db25491f796484de22405fbdee7a0f983
Reviewed-on: http://gerrit.cloudera.org:8080/9250
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cce0b2de
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cce0b2de
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cce0b2de

Branch: refs/heads/master
Commit: cce0b2de28e81ffd7a9622e3e5eba7a2081a7fba
Parents: b0d3433
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Wed Feb 7 17:39:11 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Feb 15 01:35:34 2018 +0000

----------------------------------------------------------------------
 .../runtime/bufferpool/buffer-pool-internal.h   | 18 +++++--
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 50 ++++++++++++-----
 be/src/runtime/bufferpool/buffer-pool.cc        | 57 +++++++++++++++-----
 be/src/runtime/bufferpool/buffer-pool.h         | 15 ++++++
 .../runtime/bufferpool/reservation-tracker.cc   | 12 +++++
 be/src/runtime/bufferpool/reservation-tracker.h | 12 ++++-
 6 files changed, 131 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h 
b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 7094942..dee8e4f 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -238,11 +238,19 @@ class BufferPool::Client {
   /// page->pin_in_flight was set to true by StartMoveToPinned().
   Status FinishMoveEvictedToPinned(Page* page) WARN_UNUSED_RESULT;
 
-  /// Must be called once before allocating a buffer of 'len' via the 
AllocateBuffer()
-  /// API to deduct from the client's reservation and update internal 
accounting. Cleans
-  /// dirty pages if needed to satisfy the buffer pool's internal invariants. 
No page or
-  /// client locks should be held by the caller.
-  Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT;
+  /// Must be called once before allocating a buffer of 'len' via the 
AllocateBuffer() or
+  /// AllocateUnreservedBuffer() APIs. Deducts from the client's reservation 
and updates
+  /// internal accounting. Cleans dirty pages if needed to satisfy the buffer 
pool's
+  /// internal invariants. No page or client locks should be held by the 
caller.
+  /// If 'reserved' is true, we assume that the memory is already reserved. If 
it is
+  /// false, tries to increase the reservation if needed.
+  ///
+  /// On success, returns OK and sets 'success' to true if non-NULL. If an 
error is
+  /// encountered, e.g. while cleaning pages, returns an error status. If the 
reservation
+  /// could not be increased for an unreserved allocation, returns OK and sets 
'success'
+  /// to false (for unreserved allocations, 'success' must be non-NULL).
+  Status PrepareToAllocateBuffer(
+      int64_t len, bool reserved, bool* success) WARN_UNUSED_RESULT;
 
   /// Implementation of ClientHandle::DecreaseReservationTo().
   Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc 
b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 0138a08..d6547d2 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -367,6 +367,7 @@ class BufferPoolTest : public ::testing::Test {
   }
 
   /// Parameterised test implementations.
+  void TestBufferAllocation(bool reserved);
   void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
   void TestEvictionPolicy(int64_t page_size);
   void TestCleanPageLimit(int max_clean_pages, bool randomize_core);
@@ -550,27 +551,43 @@ TEST_F(BufferPoolTest, PageCreation) {
   global_reservations_.Close();
 }
 
-TEST_F(BufferPoolTest, BufferAllocation) {
+TEST_F(BufferPoolTest, ReservedBufferAllocation) {
+  TestBufferAllocation(true);
+}
+
+TEST_F(BufferPoolTest, UnreservedBufferAllocation) {
+  TestBufferAllocation(false);
+}
+
+void BufferPoolTest::TestBufferAllocation(bool reserved) {
   // Allocate many buffers, each a power-of-two multiple of the minimum buffer 
length.
-  int num_buffers = 16;
-  int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
-  int64_t total_mem = 2 * 2 * max_buffer_len;
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
+  const int NUM_BUFFERS = 16;
+  const int64_t MAX_BUFFER_LEN = TEST_BUFFER_LEN << (NUM_BUFFERS - 1);
+
+  // Total memory required to allocate TEST_BUFFER_LEN, 2*TEST_BUFFER_LEN, ...,
+  // MAX_BUFFER_LEN.
+  const int64_t TOTAL_MEM = 2 * MAX_BUFFER_LEN - TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, 
NULL,
-      total_mem, NewProfile(), &client));
-  ASSERT_TRUE(client.IncreaseReservationToFit(total_mem));
+      TOTAL_MEM, NewProfile(), &client));
+  if (reserved) ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
 
-  vector<BufferPool::BufferHandle> handles(num_buffers);
+  vector<BufferPool::BufferHandle> handles(NUM_BUFFERS);
 
   // Create buffers of various valid sizes.
   int64_t total_allocated = 0;
-  for (int i = 0; i < num_buffers; ++i) {
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
     int size_multiple = 1 << i;
     int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
     int64_t used_before = client.GetUsedReservation();
-    ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    if (reserved) {
+      ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    } else {
+      // Reservation should be automatically increased.
+      ASSERT_OK(pool.AllocateUnreservedBuffer(&client, buffer_len, 
&handles[i]));
+    }
     total_allocated += buffer_len;
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].data() != NULL);
@@ -583,8 +600,15 @@ TEST_F(BufferPoolTest, BufferAllocation) {
     EXPECT_EQ(0, pool.GetFreeBufferBytes());
   }
 
+  if (!reserved) {
+    // Allocate all of the memory and test the failure path for unreserved 
allocations.
+    BufferPool::BufferHandle tmp_handle;
+    ASSERT_OK(pool.AllocateUnreservedBuffer(&client, TEST_BUFFER_LEN, 
&tmp_handle));
+    ASSERT_FALSE(tmp_handle.is_open()) << "No reservation for buffer";
+  }
+
   // Close the handles and check memory consumption.
-  for (int i = 0; i < num_buffers; ++i) {
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
     int64_t used_before = client.GetUsedReservation();
     int buffer_len = handles[i].len();
     pool.FreeBuffer(&client, &handles[i]);
@@ -597,7 +621,7 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
   // But freed memory is not released to the system immediately.
   EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated());
-  EXPECT_EQ(num_buffers, pool.GetNumFreeBuffers());
+  EXPECT_EQ(NUM_BUFFERS, pool.GetNumFreeBuffers());
   EXPECT_EQ(total_allocated, pool.GetFreeBufferBytes());
   global_reservations_.Close();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc 
b/be/src/runtime/bufferpool/buffer-pool.cc
index 6a111ff..2d06f7b 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -226,12 +226,23 @@ Status BufferPool::ExtractBuffer(
 
 Status BufferPool::AllocateBuffer(
     ClientHandle* client, int64_t len, BufferHandle* handle) {
-  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len));
+  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len, true, nullptr));
   Status status = allocator_->Allocate(client, len, handle);
-  if (!status.ok()) {
-    // Allocation failed - update client's accounting to reflect the failure.
-    client->impl_->FreedBuffer(len);
-  }
+  // If the allocation failed, update client's accounting to reflect the 
failure.
+  if (!status.ok()) client->impl_->FreedBuffer(len);
+  return status;
+}
+
+Status BufferPool::AllocateUnreservedBuffer(
+    ClientHandle* client, int64_t len, BufferHandle* handle) {
+  DCHECK(!handle->is_open());
+  bool success;
+  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len, false, 
&success));
+  if (!success) return Status::OK(); // Leave 'handle' closed to indicate 
failure.
+
+  Status status = allocator_->Allocate(client, len, handle);
+  // If the allocation failed, update client's accounting to reflect the 
failure.
+  if (!status.ok()) client->impl_->FreedBuffer(len);
   return status;
 }
 
@@ -546,14 +557,34 @@ Status 
BufferPool::Client::FinishMoveEvictedToPinned(Page* page) {
   return Status::OK();
 }
 
-Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) {
-  unique_lock<mutex> lock(lock_);
-  // Clean enough pages to allow allocation to proceed without violating our 
eviction
-  // policy. This can fail, so only update the accounting once success is 
ensured.
-  RETURN_IF_ERROR(CleanPages(&lock, len));
-  reservation_.AllocateFrom(len);
-  buffers_allocated_bytes_ += len;
-  DCHECK_CONSISTENCY();
+Status BufferPool::Client::PrepareToAllocateBuffer(
+    int64_t len, bool reserved, bool* success) {
+  if (success != nullptr) *success = false;
+  // Don't need to hold the client's 'lock_' yet because 'reservation_' 
operations are
+  // threadsafe.
+  if (reserved) {
+    // The client must have already reserved the memory.
+    reservation_.AllocateFrom(len);
+  } else {
+    DCHECK(success != nullptr);
+    // The client may not have reserved the memory.
+    if (!reservation_.IncreaseReservationToFitAndAllocate(len)) return 
Status::OK();
+  }
+
+  {
+    unique_lock<mutex> lock(lock_);
+    // Clean enough pages to allow allocation to proceed without violating our 
eviction
+    // policy.
+    Status status = CleanPages(&lock, len);
+    if (!status.ok()) {
+      // Reverse the allocation.
+      reservation_.ReleaseTo(len);
+      return status;
+    }
+    buffers_allocated_bytes_ += len;
+    DCHECK_CONSISTENCY();
+  }
+  if (success != nullptr) *success = true;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index 5b98579..285aacb 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -239,6 +239,21 @@ class BufferPool : public CacheLineAligned {
   Status AllocateBuffer(
       ClientHandle* client, int64_t len, BufferHandle* handle) 
WARN_UNUSED_RESULT;
 
+  /// Like AllocateBuffer(), except used when the client may not have the 
reservation
+  /// to allocate the buffer. Tries to increase reservation on the behalf of 
the client
+  /// if needed to allocate the buffer. If the reservation isn't available, 
'handle'
+  /// isn't opened and OK is returned. If an unexpected error occurs, an error 
is
+  /// returned and any reservation increase remains in effect. Safe to call 
concurrently
+  /// with any other operations for 'client', except for operations on the 
same 'handle'.
+  ///
+  /// This function is a transitional mechanism for components to allocate 
memory from
+  /// the buffer pool without implementing the reservation accounting required 
to operate
+  /// within a predetermined memory constraint. Wherever possible, clients 
should reserve
+  /// memory ahead of time and allocate out of that instead of relying on this 
"best
+  /// effort" interface.
+  Status AllocateUnreservedBuffer(
+      ClientHandle* client, int64_t len, BufferHandle* handle) 
WARN_UNUSED_RESULT;
+
   /// If 'handle' is open, close 'handle', free the buffer and decrease the 
reservation
   /// usage from 'client'. Idempotent. Safe to call concurrently with other 
operations
   /// for 'client', except for operations on the same 'handle'.

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc 
b/be/src/runtime/bufferpool/reservation-tracker.cc
index aba5dce..f0e1839 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -141,6 +141,14 @@ bool ReservationTracker::IncreaseReservationToFit(int64_t 
bytes, Status* error_s
   return IncreaseReservationInternalLocked(bytes, true, false, error_status);
 }
 
+bool ReservationTracker::IncreaseReservationToFitAndAllocate(
+    int64_t bytes, Status* error_status) {
+  lock_guard<SpinLock> l(lock_);
+  if (!IncreaseReservationInternalLocked(bytes, true, false, error_status)) 
return false;
+  AllocateFromLocked(bytes);
+  return true;
+}
+
 bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
     bool use_existing_reservation, bool is_child_reservation, Status* 
error_status) {
   DCHECK(initialized_);
@@ -359,6 +367,10 @@ vector<ReservationTracker*> 
ReservationTracker::FindPathToRoot() {
 
 void ReservationTracker::AllocateFrom(int64_t bytes) {
   lock_guard<SpinLock> l(lock_);
+  AllocateFromLocked(bytes);
+}
+
+void ReservationTracker::AllocateFromLocked(int64_t bytes) {
   DCHECK(initialized_);
   DCHECK_GE(bytes, 0);
   DCHECK_LE(bytes, unused_reservation());

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h 
b/be/src/runtime/bufferpool/reservation-tracker.h
index ff4b77e..3bf2de1 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -129,8 +129,13 @@ class ReservationTracker {
   /// Returns true if the reservation increase was successful or not 
necessary. Otherwise
   /// returns false and if 'error_status' is non-null, it returns an 
appropriate status
   /// message in it.
-  bool IncreaseReservationToFit(int64_t bytes, Status* error_status = nullptr)
-      WARN_UNUSED_RESULT;
+  bool IncreaseReservationToFit(
+      int64_t bytes, Status* error_status = nullptr) WARN_UNUSED_RESULT;
+
+  /// Like IncreaseReservationToFit(), except 'bytes' is also allocated from
+  /// the reservation on success.
+  bool IncreaseReservationToFitAndAllocate(
+      int64_t bytes, Status* error_status = nullptr) WARN_UNUSED_RESULT;
 
   /// Decrease reservation by 'bytes' on this tracker and all ancestors. This 
tracker's
   /// reservation must be at least 'bytes' before calling this method.
@@ -247,6 +252,9 @@ class ReservationTracker {
   /// 'lock_' must be held by caller.
   void CheckConsistency() const;
 
+  /// Same as AllocateFrom() except 'lock_' must be held by caller.
+  void AllocateFromLocked(int64_t bytes);
+
   /// Increase or decrease 'used_reservation_' and update profile counters 
accordingly.
   /// 'lock_' must be held by caller.
   void UpdateUsedReservation(int64_t delta);

Reply via email to