IMPALA-3203: Part 2: per-core free lists in buffer pool

Add per-core lists of clean pages and free pages to enable allocation
of buffers without contention on shared locks in the common case.

This is implemented with an additional layer of abstraction in
"BufferAllocator", which tracks all memory (free buffers and clean
pages) that is not in use but has not been released to the OS.
The old BufferAllocator is renamed to SystemAllocator.

See "Spilled Page Mgmt" and "MMap Allocator & Scalable Free Lists" in
https://goo.gl/0zuy97 for a high-level summary of how this fits into
the buffer pool design.

The guts of the new code is BufferAllocator::AllocateInternal(),
which progresses through several strategies for allocating memory.

Misc changes:
* Enforce upper limit on buffer size to reduce the number of free lists
  required.
* Add additional allocation counters.
* Slightly reorganise the MemTracker GC functions to use lambdas and
  clarify the order in which they should be called. Also adds a target
  memory value so that they don't need to free *all* of the memory in
  the system.
* Fix an accounting bug in the buffer pool where it didn't
  evict dirty pages before reclaiming a clean page.

Performance:
We will need to validate the performance of the system under high query
concurrency before this is used as part of query execution. The benchmark
in Part 1 provided some evidence that this approach of a list per core
should scale well to many cores.

Testing:
Added buffer-allocator-test to test the free list resizing algorithm
directly.

Added a test to buffer-pool-test to exercise the various new memory
reclamation code paths that are now possible. Also run buffer-pool-test
under two different faked-out NUMA setups - one with no NUMA and another
with three NUMA nodes.

buffer-pool-test, suballocator-test, and buffered-tuple-stream-v2-test
provide some further basic coverage. Future system and unit tests will
validate this further before it is used for query execution (see
IMPALA-3200).

Ran an initial version of IMPALA-4114, the ported BufferedBlockMgr
tests, against this. The randomised stress test revealed some accounting
bugs which are fixed. I'll post those tests as a follow-on patch.

Change-Id: I612bd1cd0f0e87f7d8186e5bedd53a22f2d80832
Reviewed-on: http://gerrit.cloudera.org:8080/6414
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6c162df3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6c162df3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6c162df3

Branch: refs/heads/master
Commit: 6c162df38c1e929a6e8393733f27d56f07c3dea1
Parents: 8bdfe03
Author: Tim Armstrong <[email protected]>
Authored: Wed Apr 5 23:09:39 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Apr 18 06:27:39 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/free-lists-benchmark.cc       |  12 +-
 be/src/common/init.cc                           |  61 +-
 be/src/runtime/buffered-block-mgr.cc            |   7 +-
 be/src/runtime/bufferpool/CMakeLists.txt        |   2 +
 .../runtime/bufferpool/buffer-allocator-test.cc | 125 ++++
 be/src/runtime/bufferpool/buffer-allocator.cc   | 573 ++++++++++++++++++-
 be/src/runtime/bufferpool/buffer-allocator.h    | 192 ++++++-
 .../runtime/bufferpool/buffer-pool-counters.h   |  10 +-
 .../runtime/bufferpool/buffer-pool-internal.h   |   5 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 175 +++++-
 be/src/runtime/bufferpool/buffer-pool.cc        | 164 ++----
 be/src/runtime/bufferpool/buffer-pool.h         | 102 ++--
 be/src/runtime/bufferpool/free-list-test.cc     |  32 +-
 be/src/runtime/bufferpool/free-list.h           |  22 +-
 be/src/runtime/bufferpool/suballocator-test.cc  |  16 +-
 be/src/runtime/bufferpool/suballocator.h        |   6 +-
 be/src/runtime/bufferpool/system-allocator.cc   |  44 ++
 be/src/runtime/bufferpool/system-allocator.h    |  50 ++
 be/src/runtime/disk-io-mgr.cc                   |  18 +-
 be/src/runtime/disk-io-mgr.h                    |  10 +-
 be/src/runtime/exec-env.cc                      |  22 +-
 be/src/runtime/mem-tracker.cc                   |  21 +-
 be/src/runtime/mem-tracker.h                    |  15 +-
 be/src/runtime/tmp-file-mgr-test.cc             |   4 +-
 be/src/runtime/tmp-file-mgr.cc                  |   6 +-
 be/src/runtime/tmp-file-mgr.h                   |  44 +-
 be/src/testutil/cpu-util.h                      |  74 +++
 be/src/testutil/rand-util.h                     |  48 ++
 be/src/util/cpu-info.cc                         |  26 +
 be/src/util/cpu-info.h                          |  50 +-
 30 files changed, 1569 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/benchmarks/free-lists-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/free-lists-benchmark.cc 
b/be/src/benchmarks/free-lists-benchmark.cc
index 43caeb6..5c09b39 100644
--- a/be/src/benchmarks/free-lists-benchmark.cc
+++ b/be/src/benchmarks/free-lists-benchmark.cc
@@ -27,8 +27,8 @@
 
 #include "common/object-pool.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/free-list.h"
+#include "runtime/bufferpool/system-allocator.h"
 #include "util/aligned-new.h"
 #include "util/benchmark.h"
 #include "util/cpu-info.h"
@@ -320,7 +320,7 @@ static const int MAX_LIST_ENTRIES = 64;
 static const int ALLOC_OP = 0;
 static const int FREE_OP = 1;
 
-static BufferAllocator allocator(64 * 1024);
+static SystemAllocator allocator(64 * 1024);
 
 // Simulate doing some work with the buffer.
 void DoWork(uint8_t* data, int64_t len) {
@@ -359,7 +359,9 @@ void DoFree(const BenchmarkParams& params, LockedList* 
free_list,
       list->AddFreeBuffer(move(buffers->back()));
       if (list->Size() > MAX_LIST_ENTRIES) {
         // Discard around 1/4 of the buffers to amortise the cost of sorting.
-        list->FreeBuffers(&allocator, list->Size() - MAX_LIST_ENTRIES * 3 / 4);
+        vector<BufferHandle> buffers =
+            list->GetBuffersToFree(list->Size() - MAX_LIST_ENTRIES * 3 / 4);
+        for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
       }
     } else {
       allocator.Free(move(buffers->back()));
@@ -417,7 +419,9 @@ void FreeListBenchmark(int batch_size, void* data) {
 
   // Empty out all of the free lists.
   for (LockedList* free_list : free_lists) {
-    free_list->list.FreeAll(&allocator);
+    vector<BufferHandle> buffers =
+        free_list->list.GetBuffersToFree(free_list->list.Size());
+    for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 16f0f41..4c90fa3 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -27,6 +27,7 @@
 #include "gutil/atomicops.h"
 #include "rpc/authentication.h"
 #include "rpc/thrift-util.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/decimal-value.h"
 #include "runtime/exec-env.h"
 #include "runtime/hdfs-fs-cache.h"
@@ -68,6 +69,9 @@ DEFINE_int32(max_audit_event_log_files, 0, "Maximum number of 
audit event log fi
     "to retain. The most recent audit event log files are retained. If set to 
0, "
     "all audit event log files are retained.");
 
+DEFINE_int32(memory_maintenance_sleep_time_ms, 1000, "Sleep time in 
milliseconds "
+    "between memory maintenance iterations");
+
 DEFINE_int64(pause_monitor_sleep_time_ms, 500, "Sleep time in milliseconds for 
"
     "pause monitor thread.");
 
@@ -92,27 +96,45 @@ static const float TCMALLOC_RELEASE_FREE_MEMORY_FRACTION = 
0.5f;
 
 using std::string;
 
-// Maintenance thread that runs periodically. It does a few things:
-// 1) flushes glog every logbufsecs sec. glog flushes the log file only if
-//    logbufsecs has passed since the previous flush when a new log is 
written. That means
-//    that on a quiet system, logs will be buffered indefinitely.
-// 2) checks that tcmalloc has not left too much memory in its pageheap
-static scoped_ptr<impala::Thread> maintenance_thread;
+// Log maintenance thread that runs periodically. It flushes glog every 
logbufsecs sec.
+// glog only automatically flushes the log file if logbufsecs has passed since 
the
+// previous flush when a new log is written. That means that on a quiet 
system, logs
+// will be buffered indefinitely. It also rotates log files.
+static scoped_ptr<impala::Thread> log_maintenance_thread;
+
+// Memory Maintenance thread that runs periodically to free up memory. It does 
the
+// following things every memory_maintenance_sleep_time_ms secs:
+// 1) Releases BufferPool memory that is not currently in use.
+// 2) Frees excess memory that TCMalloc has left in its pageheap.
+static scoped_ptr<impala::Thread> memory_maintenance_thread;
 
 // A pause monitor thread to monitor process pauses in impala daemons. The 
thread sleeps
 // for a short interval of time (THREAD_SLEEP_TIME_MS), wakes up and 
calculates the actual
 // time slept. If that exceeds PAUSE_WARN_THRESHOLD_MS, a warning is logged.
 static scoped_ptr<impala::Thread> pause_monitor;
 
-[[noreturn]] static void MaintenanceThread() {
+[[noreturn]] static void LogMaintenanceThread() {
   while (true) {
     sleep(FLAGS_logbufsecs);
 
     google::FlushLogFiles(google::GLOG_INFO);
 
-    // Tests don't need to run the maintenance thread. It causes issues when
-    // on teardown.
+    // No need to rotate log files in tests.
     if (impala::TestInfo::is_test()) continue;
+    // Check for log rotation in every interval of the maintenance thread
+    impala::CheckAndRotateLogFiles(FLAGS_max_log_files);
+    // Check for audit event log rotation in every interval of the maintenance 
thread
+    impala::CheckAndRotateAuditEventLogFiles(FLAGS_max_audit_event_log_files);
+  }
+}
+
+[[noreturn]] static void MemoryMaintenanceThread() {
+  while (true) {
+    SleepForMs(FLAGS_memory_maintenance_sleep_time_ms);
+    impala::ExecEnv* env = impala::ExecEnv::GetInstance();
+    if (env == nullptr) continue; // ExecEnv may not have been created yet.
+    BufferPool* buffer_pool = env->buffer_pool();
+    if (buffer_pool != nullptr) buffer_pool->Maintenance();
 
 #ifndef ADDRESS_SANITIZER
     // Required to ensure memory gets released back to the OS, even if 
tcmalloc doesn't do
@@ -139,18 +161,12 @@ static scoped_ptr<impala::Thread> pause_monitor;
 
     // When using tcmalloc, the process limit as measured by our trackers will
     // be out of sync with the process usage. Update the process tracker 
periodically.
-    impala::ExecEnv* env = impala::ExecEnv::GetInstance();
     if (env != NULL && env->process_mem_tracker() != NULL) {
       env->process_mem_tracker()->RefreshConsumptionFromMetric();
     }
 #endif
     // TODO: we should also update the process mem tracker with the reported 
JVM
     // mem usage.
-
-    // Check for log rotation in every interval of the maintenance thread
-    impala::CheckAndRotateLogFiles(FLAGS_max_log_files);
-    // Check for audit event log rotation in every interval of the maintenance 
thread
-    impala::CheckAndRotateAuditEventLogFiles(FLAGS_max_audit_event_log_files);
   }
 }
 
@@ -204,11 +220,18 @@ void impala::InitCommonRuntime(int argc, char** argv, 
bool init_jvm,
   ABORT_IF_ERROR(impala::InitAuth(argv[0]));
 
   // Initialize maintenance_thread after InitGoogleLoggingSafe and 
InitThreading.
-  maintenance_thread.reset(
-      new Thread("common", "maintenance-thread", &MaintenanceThread));
+  log_maintenance_thread.reset(
+      new Thread("common", "log-maintenance-thread", &LogMaintenanceThread));
+
+  // Memory maintenance isn't necessary for frontend tests, and it's 
undesirable
+  // to asynchronously free memory in backend tests that are testing memory
+  // management behaviour.
+  if (!impala::TestInfo::is_test()) {
+    memory_maintenance_thread.reset(
+        new Thread("common", "memory-maintenance-thread", 
&MemoryMaintenanceThread));
+  }
 
-  pause_monitor.reset(
-      new Thread("common", "pause-monitor", &PauseMonitorLoop));
+  pause_monitor.reset(new Thread("common", "pause-monitor", 
&PauseMonitorLoop));
 
   LOG(INFO) << impala::GetVersionString();
   LOG(INFO) << "Using hostname: " << FLAGS_hostname;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc 
b/be/src/runtime/buffered-block-mgr.cc
index 199807b..e4737c2 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -693,9 +693,12 @@ Status BufferedBlockMgr::CancelWrite(Block* block) {
     if (is_cancelled_) return Status::CANCELLED;
   }
   if (block->write_handle_ != NULL) {
+    // Make sure the write is not in-flight.
+    block->write_handle_->Cancel();
+    block->write_handle_->WaitForWrite();
     // Restore the in-memory data without reading from disk (e.g. decrypt it).
-    RETURN_IF_ERROR(tmp_file_group_->CancelWriteAndRestoreData(
-        move(block->write_handle_), block->valid_data()));
+    RETURN_IF_ERROR(
+        tmp_file_group_->RestoreData(move(block->write_handle_), 
block->valid_data()));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/CMakeLists.txt 
b/be/src/runtime/bufferpool/CMakeLists.txt
index 9f98968..231230b 100644
--- a/be/src/runtime/bufferpool/CMakeLists.txt
+++ b/be/src/runtime/bufferpool/CMakeLists.txt
@@ -26,9 +26,11 @@ add_library(BufferPool
   buffer-pool.cc
   reservation-tracker.cc
   suballocator.cc
+  system-allocator.cc
 )
 add_dependencies(BufferPool thrift-deps)
 
+ADD_BE_TEST(buffer-allocator-test)
 ADD_BE_TEST(buffer-pool-test)
 ADD_BE_TEST(free-list-test)
 ADD_BE_TEST(reservation-tracker-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc 
b/be/src/runtime/bufferpool/buffer-allocator-test.cc
new file mode 100644
index 0000000..515c1ae
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+
+#include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-allocator.h"
+#include "runtime/bufferpool/buffer-pool-internal.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "testutil/cpu-util.h"
+#include "testutil/gtest-util.h"
+#include "util/cpu-info.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+class BufferAllocatorTest : public ::testing::Test {
+ public:
+  virtual void SetUp() {
+    dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0));
+    dummy_reservation_.InitRootTracker(nullptr, 0);
+    ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, 
nullptr, 0,
+        obj_pool_.Add(new RuntimeProfile(&obj_pool_, "")), &dummy_client_));
+  }
+
+  virtual void TearDown() {
+    dummy_pool_->DeregisterClient(&dummy_client_);
+    dummy_reservation_.Close();
+    obj_pool_.Clear();
+    CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
+  }
+
+  int GetFreeListSize(BufferPool::BufferAllocator* allocator, int core, 
int64_t len) {
+    return allocator->GetFreeListSize(core, len);
+  }
+
+  /// The minimum buffer size used in most tests.
+  const static int64_t TEST_BUFFER_LEN = 1024;
+
+  ObjectPool obj_pool_;
+
+  /// Need a dummy pool and client to pass around. We bypass the reservation 
mechanisms
+  /// in these tests so they don't need to be properly initialised.
+  BufferPool* dummy_pool_;
+  BufferPool::ClientHandle dummy_client_;
+  ReservationTracker dummy_reservation_;
+};
+
+// Functional test that makes sure the free lists cache as many buffers as 
expected.
+TEST_F(BufferAllocatorTest, FreeListSizes) {
+  // Run on core 0 to ensure that we always go to the same free list.
+  const int CORE = 0;
+  CpuTestUtil::PinToCore(CORE);
+
+  const int NUM_BUFFERS = 512;
+  const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
+
+  BufferPool::BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, 
TOTAL_BYTES);
+
+  // Allocate a bunch of buffers - all free list checks should miss.
+  vector<BufferHandle> buffers(NUM_BUFFERS);
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
+    ASSERT_OK(allocator.Allocate(&dummy_client_, TEST_BUFFER_LEN, 
&buffers[i]));
+  }
+
+  // Add back the allocated buffers - all should be added to the list.
+  for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
+  ASSERT_EQ(NUM_BUFFERS, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
+
+  // We should be able to get back the buffers from this list.
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
+    ASSERT_OK(allocator.Allocate(&dummy_client_, TEST_BUFFER_LEN, 
&buffers[i]));
+    ASSERT_TRUE(buffers[i].is_open());
+  }
+  ASSERT_EQ(0, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
+
+  // Add back the buffers.
+  for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
+  ASSERT_EQ(NUM_BUFFERS, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
+
+  // Test DebugString().
+  LOG(INFO) << allocator.DebugString();
+
+  // Periodic maintenance should shrink the list's size each time after the 
first two
+  // calls, since the low water mark is the current size.
+  int maintenance_calls = 0;
+  while (GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN) > 0) {
+    int prev_size = GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN);
+    allocator.Maintenance();
+    int new_size = GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN);
+    if (maintenance_calls == 0) {
+      // The low water mark should be zero until we've called Maintenance() 
once.
+      EXPECT_EQ(prev_size, new_size);
+    } else {
+      // The low water mark will be the current size, so half the buffers 
should be freed.
+      EXPECT_EQ(prev_size == 1 ? 0 : prev_size - prev_size / 2, new_size);
+    }
+    ++maintenance_calls;
+  }
+
+  // Also exercise ReleaseMemory() - it should clear out the list entirely.
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
+    ASSERT_OK(allocator.Allocate(&dummy_client_, TEST_BUFFER_LEN, 
&buffers[i]));
+  }
+  for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
+  allocator.ReleaseMemory(TOTAL_BYTES);
+  ASSERT_EQ(0, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
+}
+}
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.cc 
b/be/src/runtime/bufferpool/buffer-allocator.cc
index 7b7d216..e3c6e60 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator.cc
@@ -17,25 +17,576 @@
 
 #include "runtime/bufferpool/buffer-allocator.h"
 
-#include "util/bit-util.h"
+#include <mutex>
+
+#include <boost/bind.hpp>
+
+#include "common/atomic.h"
+#include "gutil/bits.h"
+#include "runtime/bufferpool/system-allocator.h"
+#include "util/cpu-info.h"
+#include "util/pretty-printer.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
 
 namespace impala {
 
-BufferAllocator::BufferAllocator(int64_t min_buffer_len)
-  : min_buffer_len_(min_buffer_len) {}
+/// An arena containing free buffers and clean pages that are associated with a
+/// particular core. All public methods are thread-safe.
+class BufferPool::FreeBufferArena : public CacheLineAligned {
+ public:
+  FreeBufferArena(BufferAllocator* parent);
+
+  // Destructor should only run in backend tests.
+  ~FreeBufferArena();
+
+  /// Add a free buffer to the free lists. May free buffers to the system 
allocator
+  /// if the list becomes full. Caller should not hold 'lock_'
+  void AddFreeBuffer(BufferHandle buffer);
+
+  /// Try to get a free buffer of 'buffer_len' bytes from this arena. Returns 
true and
+  /// sets 'buffer' if found or false if not found. Caller should not hold 
'lock_'.
+  bool PopFreeBuffer(int64_t buffer_len, BufferHandle* buffer);
+
+  /// Try to get a buffer of 'buffer_len' bytes from this arena by evicting a 
clean page.
+  /// Returns true and sets 'buffer' if a clean page was evicted or false 
otherwise.
+  /// Caller should not hold 'lock_'
+  bool EvictCleanPage(int64_t buffer_len, BufferHandle* buffer);
+
+  /// Try to free 'target_bytes' of memory from this arena back to the system 
allocator.
+  /// Up to 'target_bytes_to_claim' will be given back to the caller, so it 
can allocate
+  /// a buffer of that size from the system. Any bytes freed in excess of
+  /// 'target_bytes_to_claim' are added to 'system_bytes_remaining_'. Returns 
the actual
+  /// number of bytes freed and the actual number of bytes claimed.
+  ///
+  /// Caller should not hold 'lock_'. If 'arena_lock' is non-null, ownership 
of the
+  /// arena lock is transferred to the caller. Uses std::unique_lock instead of
+  /// boost::unique_lock because it is movable.
+  pair<int64_t, int64_t> FreeSystemMemory(int64_t target_bytes_to_free,
+      int64_t target_bytes_to_claim, std::unique_lock<SpinLock>* arena_lock);
+
+  /// Add a clean page to the arena. Caller must hold the page's client's lock 
and not
+  /// hold 'lock_' or any Page::lock_.
+  void AddCleanPage(Page* page);
+
+  /// Removes the clean page from the arena if present. Returns true if 
removed. If
+  /// 'claim_buffer' is true, the buffer is returned with the page, otherwise 
it is
+  /// added to the free buffer list. Caller must hold the page's client's lock 
and
+  /// not hold 'lock_' or any Page::lock_.
+  bool RemoveCleanPage(bool claim_buffer, Page* page);
+
+  /// Called periodically. Shrinks free lists that are holding onto more 
memory than
+  /// needed.
+  void Maintenance();
+
+  /// Test helper: gets the current size of the free list for buffers of 'len' 
bytes
+  /// on core 'core'.
+  int GetFreeListSize(int64_t len);
+
+  string DebugString();
+
+ private:
+  /// The data structures for each power-of-two size of buffers/pages.
+  /// All members are protected by FreeBufferArena::lock_ unless otherwise 
mentioned.
+  struct PerSizeLists {
+    PerSizeLists() : num_free_buffers(0), low_water_mark(0), 
num_clean_pages(0) {}
+    /// The number of entries in 'free_buffers'. Can be read without holding a 
lock to
+    /// allow threads to quickly skip over empty lists when trying to find a 
buffer.
+    AtomicInt64 num_free_buffers;
+
+    /// Buffers that are not in use that were originally allocated on the core
+    /// corresponding to this arena.
+    FreeList free_buffers;
+
+    /// The minimum size of 'free_buffers' since the last Maintenance() call.
+    int low_water_mark;
+
+    /// The number of entries in 'clean_pages'.
+    /// Can be read without holding a lock to allow threads to quickly skip 
over empty
+    /// lists when trying to find a buffer in a different arena.
+    AtomicInt64 num_clean_pages;
+
+    /// Unpinned pages that have had their contents written to disk. These 
pages can be
+    /// evicted to reclaim a buffer for any client. Pages are evicted in FIFO 
order,
+    /// so that pages are evicted in approximately the same order that the 
clients wrote
+    /// them to disk. Protected by FreeBufferArena::lock_.
+    InternalList<Page> clean_pages;
+  };
+
+  /// Return the number of buffer sizes for this allocator.
+  int NumBufferSizes() const {
+    return parent_->log_max_buffer_len_ - parent_->log_min_buffer_len_ + 1;
+  }
+
+  /// Return the lists of buffers for buffers of the given length.
+  PerSizeLists* GetListsForSize(int64_t buffer_len) {
+    DCHECK(BitUtil::IsPowerOf2(buffer_len));
+    int idx = Bits::Log2Ceiling64(buffer_len) - parent_->log_min_buffer_len_;
+    DCHECK_LT(idx, NumBufferSizes());
+    return &buffer_sizes_[idx];
+  }
+
+  BufferAllocator* const parent_;
+
+  /// Protects all data structures in the arena. See buffer-pool-internal.h 
for lock
+  /// order.
+  SpinLock lock_;
 
-Status BufferAllocator::Allocate(int64_t len, BufferPool::BufferHandle* 
buffer) {
+  /// Free buffers and clean pages for each buffer size for this arena.
+  /// Indexed by log2(bytes) - log2(min_buffer_len_).
+  PerSizeLists buffer_sizes_[LOG_MAX_BUFFER_BYTES + 1];
+};
+
+int64_t BufferPool::BufferAllocator::CalcMaxBufferLen(
+    int64_t min_buffer_len, int64_t system_bytes_limit) {
+  // Find largest power of 2 smaller than 'system_bytes_limit'.
+  int64_t upper_bound = system_bytes_limit == 0 ? 1L : 1L
+          << Bits::Log2Floor64(system_bytes_limit);
+  upper_bound = min(MAX_BUFFER_BYTES, upper_bound);
+  return max(min_buffer_len, upper_bound); // Can't be < min_buffer_len.
+}
+
+BufferPool::BufferAllocator::BufferAllocator(
+    BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit)
+  : pool_(pool),
+    system_allocator_(new SystemAllocator(min_buffer_len)),
+    min_buffer_len_(min_buffer_len),
+    max_buffer_len_(CalcMaxBufferLen(min_buffer_len, system_bytes_limit)),
+    log_min_buffer_len_(Bits::Log2Ceiling64(min_buffer_len_)),
+    log_max_buffer_len_(Bits::Log2Ceiling64(max_buffer_len_)),
+    system_bytes_limit_(system_bytes_limit),
+    system_bytes_remaining_(system_bytes_limit),
+    per_core_arenas_(CpuInfo::GetMaxNumCores()),
+    max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS) {
+  DCHECK(BitUtil::IsPowerOf2(min_buffer_len_)) << min_buffer_len_;
+  DCHECK(BitUtil::IsPowerOf2(max_buffer_len_)) << max_buffer_len_;
+  DCHECK_LE(0, min_buffer_len_);
+  DCHECK_LE(min_buffer_len_, max_buffer_len_);
+  DCHECK_LE(max_buffer_len_, MAX_BUFFER_BYTES);
+  DCHECK_LE(max_buffer_len_, max(system_bytes_limit_, min_buffer_len_));
+
+  for (unique_ptr<FreeBufferArena>& arena : per_core_arenas_) {
+    arena.reset(new FreeBufferArena(this));
+  }
+}
+
+BufferPool::BufferAllocator::~BufferAllocator() {
+  per_core_arenas_.clear(); // Release all the memory.
+  // Check for accounting leaks.
+  DCHECK_EQ(system_bytes_limit_, system_bytes_remaining_.Load());
+}
+
+Status BufferPool::BufferAllocator::Allocate(
+    ClientHandle* client, int64_t len, BufferHandle* buffer) {
+  SCOPED_TIMER(client->impl_->counters().alloc_time);
+  COUNTER_ADD(client->impl_->counters().bytes_alloced, len);
+  COUNTER_ADD(client->impl_->counters().num_allocations, 1);
+
+  RETURN_IF_ERROR(AllocateInternal(len, buffer));
+  DCHECK(buffer->is_open());
+  buffer->client_ = client;
+  return Status::OK();
+}
+
+Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, 
BufferHandle* buffer) {
+  DCHECK(!buffer->is_open());
   DCHECK_GE(len, min_buffer_len_);
-  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+  DCHECK(BitUtil::IsPowerOf2(len)) << len;
+
+  if (UNLIKELY(len > MAX_BUFFER_BYTES)) {
+    return Status(Substitute(
+        "Tried to allocate buffer of $0 bytes > max of $1 bytes", len, 
MAX_BUFFER_BYTES));
+  }
+  if (UNLIKELY(len > system_bytes_limit_)) {
+    return Status(Substitute("Tried to allocate buffer of $0 bytes > buffer 
pool limit "
+        "of $1 bytes", len, system_bytes_limit_));
+  }
+
+  const int current_core = CpuInfo::GetCurrentCore();
+  // Fast path: recycle a buffer of the correct size from this core's arena.
+  FreeBufferArena* current_core_arena = per_core_arenas_[current_core].get();
+  if (current_core_arena->PopFreeBuffer(len, buffer)) return Status::OK();
+
+  // Fast-ish path: allocate a new buffer if there is room in 
'system_bytes_remaining_'.
+  int64_t delta = DecreaseSystemBytesRemaining(len, true);
+  if (delta != len) {
+    DCHECK_EQ(0, delta);
+    const vector<int>& numa_node_cores = 
CpuInfo::GetCoresOfSameNumaNode(current_core);
+    const int numa_node_core_idx = CpuInfo::GetNumaNodeCoreIdx(current_core);
+
+    // Fast-ish path: find a buffer of the right size from another core on the 
same
+    // NUMA node. Avoid getting a buffer from another NUMA node - prefer 
reclaiming
+    // a clean page on this NUMA node or scavenging then reallocating a new 
buffer.
+    // We don't want to get into a state where allocations between the nodes 
are
+    // unbalanced and one node is stuck reusing memory allocated on the other 
node.
+    for (int i = 1; i < numa_node_cores.size(); ++i) {
+      // Each core should start searching from a different point to avoid 
hot-spots.
+      int other_core = numa_node_cores[(numa_node_core_idx + i) % 
numa_node_cores.size()];
+      FreeBufferArena* other_core_arena = per_core_arenas_[other_core].get();
+      if (other_core_arena->PopFreeBuffer(len, buffer)) return Status::OK();
+    }
+
+    // Fast-ish path: evict a clean page of the right size from the current 
NUMA node.
+    for (int i = 0; i < numa_node_cores.size(); ++i) {
+      int other_core = numa_node_cores[(numa_node_core_idx + i) % 
numa_node_cores.size()];
+      FreeBufferArena* other_core_arena = per_core_arenas_[other_core].get();
+      if (other_core_arena->EvictCleanPage(len, buffer)) return Status::OK();
+    }
 
-  uint8_t* alloc = reinterpret_cast<uint8_t*>(malloc(len));
-  if (alloc == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len);
-  buffer->Open(alloc, len);
+    // Slow path: scavenge buffers of different sizes from free buffer lists 
and clean
+    // pages. Make initial, fast attempts to gather the required buffers, 
before
+    // finally making a slower, but guaranteed-to-succeed attempt.
+    // TODO: IMPALA-4703: add a stress option where we vary the number of 
attempts
+    // randomly.
+    int attempt = 0;
+    while (attempt < max_scavenge_attempts_ && delta < len) {
+      bool final_attempt = attempt == max_scavenge_attempts_ - 1;
+      delta += ScavengeBuffers(final_attempt, current_core, len - delta);
+      ++attempt;
+    }
+    if (delta < len) {
+      system_bytes_remaining_.Add(delta);
+      // This indicates an accounting bug - we should be able to always get 
the memory.
+      return Status(TErrorCode::INTERNAL_ERROR, Substitute(
+          "Could not allocate $0 bytes: was only able to free up $1 bytes 
after $2 "
+          "attempts:\n$3", len, delta, max_scavenge_attempts_, 
pool_->DebugString()));
+    }
+  }
+  // We have headroom to allocate a new buffer at this point.
+  DCHECK_EQ(delta, len);
+  Status status = system_allocator_->Allocate(len, buffer);
+  if (!status.ok()) {
+    system_bytes_remaining_.Add(len);
+    return status;
+  }
   return Status::OK();
 }
 
-void BufferAllocator::Free(BufferPool::BufferHandle&& buffer) {
-  free(buffer.data());
-  buffer.Reset(); // Avoid DCHECK in ~BufferHandle().
+int64_t BufferPool::BufferAllocator::DecreaseSystemBytesRemaining(
+    int64_t max_decrease, bool require_full_decrease) {
+  while (true) {
+    int64_t old_value = system_bytes_remaining_.Load();
+    if (require_full_decrease && old_value < max_decrease) return 0;
+    int64_t decrease = min(old_value, max_decrease);
+    int64_t new_value = old_value - decrease;
+    if (system_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
+      return decrease;
+    }
+  }
+}
+
+int64_t BufferPool::BufferAllocator::ScavengeBuffers(
+    bool slow_but_sure, int current_core, int64_t target_bytes) {
+  // There are two strategies for scavenging buffers:
+  // 1) Fast, opportunistic: Each arena is searched in succession. Although 
reservations
+  //    guarantee that the memory we need is available somewhere, this may 
fail if we
+  //    we race with another thread that returned buffers to an arena that 
we've already
+  //    searched and took the buffers from an arena we haven't yet searched.
+  // 2) Slow, guaranteed to succeed: In order to ensure that we can find the 
memory in a
+  //    single pass, we hold locks for all arenas we've already examined. That 
way, other
+  //    threads can't take the memory that we need from an arena that we 
haven't yet
+  //    examined (or from 'system_bytes_available_') because in order to do 
so, it would
+  //    have had to return the equivalent amount of memory to an earlier arena 
or added
+  //    it back into 'systems_bytes_reamining_'. The former can't happen since 
we're
+  //    still holding those locks, and the latter is solved by trying
+  //    DecreaseSystemBytesRemaining() at the end.
+  DCHECK_GT(target_bytes, 0);
+  // First make sure we've used up all the headroom in the buffer limit.
+  int64_t bytes_found = DecreaseSystemBytesRemaining(target_bytes, false);
+  if (bytes_found == target_bytes) return bytes_found;
+
+  // In 'slow_but_sure' mode, we will hold locks for multiple arenas at the 
same time and
+  // therefore must start at 0 to respect the lock order. Otherwise we start 
with the
+  // current core's arena for locality and to avoid excessive contention on 
arena 0.
+  int start_core = slow_but_sure ? 0 : current_core;
+  vector<std::unique_lock<SpinLock>> arena_locks;
+  if (slow_but_sure) arena_locks.resize(per_core_arenas_.size());
+
+  for (int i = 0; i < per_core_arenas_.size(); ++i) {
+    int core_to_check = (start_core + i) % per_core_arenas_.size();
+    FreeBufferArena* arena = per_core_arenas_[core_to_check].get();
+    int64_t bytes_needed = target_bytes - bytes_found;
+    bytes_found += arena->FreeSystemMemory(bytes_needed, bytes_needed,
+         slow_but_sure ? &arena_locks[i] : nullptr).second;
+    if (bytes_found == target_bytes) break;
+  }
+  DCHECK_LE(bytes_found, target_bytes);
+
+  // Decrement 'system_bytes_remaining_' while still holding the arena locks 
to avoid
+  // the window for a race with another thread that removes a buffer from a 
list and
+  // then increments 'system_bytes_remaining_'. The race is prevented because 
the other
+  // thread holds the lock while decrementing 'system_bytes_remaining_' in the 
cases
+  // where it may not have reservation corresponding to that memory.
+  if (slow_but_sure && bytes_found < target_bytes) {
+    bytes_found += DecreaseSystemBytesRemaining(target_bytes - bytes_found, 
true);
+    DCHECK_EQ(bytes_found, target_bytes) << DebugString();
+  }
+  return bytes_found;
+}
+
+void BufferPool::BufferAllocator::Free(BufferHandle&& handle) {
+  DCHECK(handle.is_open());
+  handle.client_ = nullptr; // Buffer is no longer associated with a client.
+  FreeBufferArena* arena = per_core_arenas_[handle.home_core_].get();
+  arena->AddFreeBuffer(move(handle));
+}
+
+void BufferPool::BufferAllocator::AddCleanPage(
+    const unique_lock<mutex>& client_lock, Page* page) {
+  page->client->DCheckHoldsLock(client_lock);
+  FreeBufferArena* arena = per_core_arenas_[page->buffer.home_core_].get();
+  arena->AddCleanPage(page);
+}
+
+bool BufferPool::BufferAllocator::RemoveCleanPage(
+    const unique_lock<mutex>& client_lock, bool claim_buffer, Page* page) {
+  page->client->DCheckHoldsLock(client_lock);
+  FreeBufferArena* arena;
+  {
+    lock_guard<SpinLock> pl(page->buffer_lock);
+    // Page may be evicted - in which case it has no home core and is not in 
an arena.
+    if (!page->buffer.is_open()) return false;
+    arena = per_core_arenas_[page->buffer.home_core_].get();
+  }
+  return arena->RemoveCleanPage(claim_buffer, page);
+}
+
+void BufferPool::BufferAllocator::Maintenance() {
+  for (unique_ptr<FreeBufferArena>& arena : per_core_arenas_) 
arena->Maintenance();
+}
+
+void BufferPool::BufferAllocator::ReleaseMemory(int64_t bytes_to_free) {
+  int64_t bytes_freed = 0;
+  int current_core = CpuInfo::GetCurrentCore();
+  for (int i = 0; i < per_core_arenas_.size(); ++i) {
+    int core_to_check = (current_core + i) % per_core_arenas_.size();
+    FreeBufferArena* arena = per_core_arenas_[core_to_check].get();
+    // Free but don't claim any memory.
+    bytes_freed += arena->FreeSystemMemory(bytes_to_free - bytes_freed, 0, 
nullptr).first;
+    if (bytes_freed >= bytes_to_free) return;
+  }
+}
+
+int BufferPool::BufferAllocator::GetFreeListSize(int core, int64_t len) {
+  return per_core_arenas_[core]->GetFreeListSize(len);
+}
+
+int64_t BufferPool::BufferAllocator::FreeToSystem(vector<BufferHandle>&& 
buffers) {
+  int64_t bytes_freed = 0;
+  for (BufferHandle& buffer : buffers) {
+    bytes_freed += buffer.len();
+    system_allocator_->Free(move(buffer));
+  }
+  return bytes_freed;
+}
+
+string BufferPool::BufferAllocator::DebugString() {
+  stringstream ss;
+  ss << "<BufferAllocator> " << this << " min_buffer_len: " << min_buffer_len_
+     << " system_bytes_limit: " << system_bytes_limit_
+     << " system_bytes_remaining: " << system_bytes_remaining_.Load() << "\n";
+  for (int i = 0; i < per_core_arenas_.size(); ++i) {
+    ss << "  Arena " << i << " " << per_core_arenas_[i]->DebugString() << "\n";
+  }
+  return ss.str();
+}
+
+BufferPool::FreeBufferArena::FreeBufferArena(BufferAllocator* parent) : 
parent_(parent) {}
+
+BufferPool::FreeBufferArena::~FreeBufferArena() {
+  for (int i = 0; i < NumBufferSizes(); ++i) {
+    // Clear out the free lists.
+    FreeList* list = &buffer_sizes_[i].free_buffers;
+    vector<BufferHandle> buffers = list->GetBuffersToFree(list->Size());
+    parent_->system_bytes_remaining_.Add(parent_->FreeToSystem(move(buffers)));
+
+    // All pages should have been destroyed.
+    DCHECK_EQ(0, buffer_sizes_[i].clean_pages.size());
+  }
+}
+
+void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle buffer) {
+  lock_guard<SpinLock> al(lock_);
+  PerSizeLists* lists = GetListsForSize(buffer.len());
+  FreeList* list = &lists->free_buffers;
+  DCHECK_EQ(lists->num_free_buffers.Load(), list->Size());
+  lists->num_free_buffers.Add(1);
+  list->AddFreeBuffer(move(buffer));
+}
+
+bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* 
page) {
+  lock_guard<SpinLock> al(lock_);
+  PerSizeLists* lists = GetListsForSize(page->len);
+  DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
+  if (!lists->clean_pages.Remove(page)) return false;
+  lists->num_clean_pages.Add(-1);
+  if (!claim_buffer) {
+    BufferHandle buffer;
+    {
+      lock_guard<SpinLock> pl(page->buffer_lock);
+      buffer = move(page->buffer);
+    }
+    lists->free_buffers.AddFreeBuffer(move(buffer));
+    lists->num_free_buffers.Add(1);
+  }
+  return true;
+}
+
+bool BufferPool::FreeBufferArena::PopFreeBuffer(
+    int64_t buffer_len, BufferHandle* buffer) {
+  PerSizeLists* lists = GetListsForSize(buffer_len);
+  // Check before acquiring lock.
+  if (lists->num_free_buffers.Load() == 0) return false;
+
+  lock_guard<SpinLock> al(lock_);
+  FreeList* list = &lists->free_buffers;
+  DCHECK_EQ(lists->num_free_buffers.Load(), list->Size());
+  if (!list->PopFreeBuffer(buffer)) return false;
+  lists->num_free_buffers.Add(-1);
+  lists->low_water_mark = min<int>(lists->low_water_mark, list->Size());
+  return true;
+}
+
+bool BufferPool::FreeBufferArena::EvictCleanPage(
+    int64_t buffer_len, BufferHandle* buffer) {
+  PerSizeLists* lists = GetListsForSize(buffer_len);
+  // Check before acquiring lock.
+  if (lists->num_clean_pages.Load() == 0) return false;
+
+  lock_guard<SpinLock> al(lock_);
+  DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
+  Page* page = lists->clean_pages.Dequeue();
+  if (page == nullptr) return false;
+  lists->num_clean_pages.Add(-1);
+  lock_guard<SpinLock> pl(page->buffer_lock);
+  *buffer = move(page->buffer);
+  return true;
+}
+
+pair<int64_t, int64_t> BufferPool::FreeBufferArena::FreeSystemMemory(
+    int64_t target_bytes_to_free, int64_t target_bytes_to_claim,
+    std::unique_lock<SpinLock>* arena_lock) {
+  DCHECK_GT(target_bytes_to_free, 0);
+  DCHECK_GE(target_bytes_to_free, target_bytes_to_claim);
+  int64_t bytes_freed = 0;
+  // If the caller is acquiring the lock, just lock for the whole method.
+  // Otherwise lazily acquire the lock the first time we find some memory
+  // to free.
+  std::unique_lock<SpinLock> al(lock_, std::defer_lock_t());
+  if (arena_lock != nullptr) al.lock();
+
+  vector<BufferHandle> buffers;
+  // Search from largest to smallest to avoid freeing many small buffers unless
+  // necessary.
+  for (int i = NumBufferSizes() - 1; i >= 0; --i) {
+    PerSizeLists* lists = &buffer_sizes_[i];
+    // Check before acquiring lock to avoid expensive lock acquisition and 
make scanning
+    // empty lists much cheaper.
+    if (lists->num_free_buffers.Load() == 0 && lists->num_clean_pages.Load() 
== 0) {
+      continue;
+    }
+    if (!al.owns_lock()) al.lock();
+    FreeList* free_buffers = &lists->free_buffers;
+    InternalList<Page>* clean_pages = &lists->clean_pages;
+    DCHECK_EQ(lists->num_free_buffers.Load(), free_buffers->Size());
+    DCHECK_EQ(lists->num_clean_pages.Load(), clean_pages->size());
+
+    // Figure out how many of the buffers in the free list we should free.
+    DCHECK_GT(target_bytes_to_free, bytes_freed);
+    const int64_t buffer_len = 1L << (i + parent_->log_min_buffer_len_);
+    int64_t buffers_to_free = min(free_buffers->Size(),
+        BitUtil::Ceil(target_bytes_to_free - bytes_freed, buffer_len));
+    int64_t buffer_bytes_to_free = buffers_to_free * buffer_len;
+
+    // Evict clean pages by moving their buffers to the free page list before 
freeing
+    // them. This ensures that they are freed based on memory address in the 
expected
+    // order.
+    while (bytes_freed + buffer_bytes_to_free < target_bytes_to_free) {
+      Page* page = clean_pages->Dequeue();
+      if (page == nullptr) break;
+      lists->num_clean_pages.Add(-1);
+      BufferHandle page_buffer;
+      {
+        lock_guard<SpinLock> pl(page->buffer_lock);
+        page_buffer = move(page->buffer);
+      }
+      ++buffers_to_free;
+      buffer_bytes_to_free += page_buffer.len();
+      free_buffers->AddFreeBuffer(move(page_buffer));
+      lists->num_free_buffers.Add(1);
+    }
+    if (buffers_to_free > 0) {
+      int64_t buffer_bytes_freed =
+          
parent_->FreeToSystem(free_buffers->GetBuffersToFree(buffers_to_free));
+      DCHECK_EQ(buffer_bytes_to_free, buffer_bytes_freed);
+      bytes_freed += buffer_bytes_to_free;
+      lists->num_free_buffers.Add(-buffers_to_free);
+      lists->low_water_mark = min<int>(lists->low_water_mark, 
free_buffers->Size());
+      if (bytes_freed >= target_bytes_to_free) break;
+    }
+    // Should have cleared out all lists if we don't have enough memory at 
this point.
+    DCHECK_EQ(0, free_buffers->Size());
+    DCHECK_EQ(0, clean_pages->size());
+  }
+  int64_t bytes_claimed = min(bytes_freed, target_bytes_to_claim);
+  if (bytes_freed > bytes_claimed) {
+    // Add back the extra for other threads before releasing the lock to avoid 
race
+    // where the other thread may not be able to find enough buffers.
+    parent_->system_bytes_remaining_.Add(bytes_freed - bytes_claimed);
+  }
+  if (arena_lock != nullptr) *arena_lock = move(al);
+  return make_pair(bytes_freed, bytes_claimed);
+}
+
+void BufferPool::FreeBufferArena::AddCleanPage(Page* page) {
+  lock_guard<SpinLock> al(lock_);
+  PerSizeLists* lists = GetListsForSize(page->len);
+  DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
+  lists->clean_pages.Enqueue(page);
+  lists->num_clean_pages.Add(1);
+}
+
+void BufferPool::FreeBufferArena::Maintenance() {
+  lock_guard<SpinLock> al(lock_);
+  for (int i = 0; i < NumBufferSizes(); ++i) {
+    PerSizeLists* lists = &buffer_sizes_[i];
+    DCHECK_LE(lists->low_water_mark, lists->free_buffers.Size());
+    if (lists->low_water_mark != 0) {
+      // We haven't needed the buffers below the low water mark since the 
previous
+      // Maintenance() call. Discard half of them to free up memory. By always 
discarding
+      // at least one, we guarantee that an idle list will shrink to zero 
entries.
+      int num_to_free = max(1, lists->low_water_mark / 2);
+      parent_->system_bytes_remaining_.Add(
+          
parent_->FreeToSystem(lists->free_buffers.GetBuffersToFree(num_to_free)));
+      lists->num_free_buffers.Add(-num_to_free);
+    }
+    lists->low_water_mark = lists->free_buffers.Size();
+  }
+}
+
+int BufferPool::FreeBufferArena::GetFreeListSize(int64_t len) {
+  lock_guard<SpinLock> al(lock_);
+  PerSizeLists* lists = GetListsForSize(len);
+  DCHECK_EQ(lists->num_free_buffers.Load(), lists->free_buffers.Size());
+  return lists->free_buffers.Size();
+}
+
+string BufferPool::FreeBufferArena::DebugString() {
+  lock_guard<SpinLock> al(lock_);
+  stringstream ss;
+  ss << "<FreeBufferArena> " << this << "\n";
+  for (int i = 0; i < NumBufferSizes(); ++i) {
+    int64_t buffer_len = 1L << (parent_->log_min_buffer_len_ + i);
+    PerSizeLists& lists = buffer_sizes_[i];
+    ss << "  " << PrettyPrinter::PrintBytes(buffer_len) << ":"
+       << " free buffers: " << lists.num_free_buffers.Load()
+       << " low water mark: " << lists.low_water_mark
+       << " clean pages: " << lists.num_clean_pages.Load() << " ";
+    lists.clean_pages.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+
+    ss << "\n";
+  }
+  return ss.str();
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h 
b/be/src/runtime/bufferpool/buffer-allocator.h
index 4358f75..fc79970 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.h
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -18,32 +18,196 @@
 #ifndef IMPALA_RUNTIME_BUFFER_ALLOCATOR_H
 #define IMPALA_RUNTIME_BUFFER_ALLOCATOR_H
 
-#include "common/status.h"
+#include <boost/scoped_ptr.hpp>
 
-#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/buffer-pool-internal.h"
+#include "runtime/bufferpool/free-list.h"
+#include "util/aligned-new.h"
 
 namespace impala {
 
-/// The underlying memory allocator for the buffer pool. All buffers are 
allocated through
-/// the BufferPool's BufferAllocator. The allocator only handles allocating 
buffers that
-/// are power-of-two multiples of the minimum buffer length.
+/// The internal buffer allocator used by BufferPool to allocator power-of-two 
sized
+/// buffers. BufferAllocator builds on top of SystemAllocator by adding 
caching of
+/// free buffers and clean pages where the memory is not currently in use by a 
client
+/// but has not yet been released to SystemAllocator.
 ///
-/// TODO:
-/// * Allocate memory with mmap() instead of malloc().
-/// * Implement free lists in the allocator or external to the allocator.
-class BufferAllocator {
+/// The allocator is optimised for the common case where an allocation can be 
served
+/// by reclaiming a buffer of the request size from the current core's arena. 
In this
+/// case there is no contention for locks between concurrently-running 
threads. If this
+/// fails, progressively more expensive approaches to allocate memory are 
tried until
+/// the allocation eventually success (see AllocateInternal() for details).
+///
+/// Buffer Reservations
+/// ===================
+/// The implementation of the BufferAllocator relies on the BufferPool's 
reservation
+/// tracking system. The allocator is given a hard limit 
('system_bytes_limit'), above
+/// which all allocations will fail. Allocations up to 'system_bytes_limit' are
+/// guaranteed to succeed unless an unexpected system error occurs (e.g. we 
can't allocate
+/// all of the required memory from the OS). Reservations must be set up so 
that the total
+/// of all reservations does not exceed 'system_bytes_limit', thus ensuring 
that
+/// BufferAllocator can alway find memory to fulfill reservations.
+///
+/// +========================+
+/// | IMPLEMENTATION NOTES   |
+/// +========================+
+///
+/// Memory
+/// ======
+/// Memory managed by BufferAllocator comes in four forms:
+/// 1. Buffers returned to the client (corresponding to a used reservation)
+/// 2. Free buffers cached in the BufferAllocator's free lists.
+/// 3. Buffers attached to clean unpinned pages in the BufferAllocator's clean 
page lists.
+/// 4. Bytes that are not allocated from the system: 'system_bytes_remaining_'.
+/// Together these always add up to 'system_bytes_limit', which allows 
BufferAllocator
+/// to always fulfill reservations via some combination of memory in forms 2, 
3 or 4.
+///
+/// The BufferAllocator code is careful not to make memory inaccessible to 
concurrently
+/// executing threads that are entitled to it. E.g. if one thread is entitled 
to allocate
+/// a 1MB buffer from the BufferAllocator's free or clean page lists but needs 
to release
+/// a 2MB buffer to the system to free up enough memory, it must add 1MB to
+/// 'system_bytes_remaining_' in the same critical section in which it freed 
the 2MB
+/// buffer. Otherwise a concurrent thread that had a reservation for 1MB of 
memory might
+/// not be able to find it.
+///
+/// Arenas
+/// ======
+/// The buffer allocator's data structures are broken up into arenas, with an 
arena per
+/// core. Within each arena, each buffer or page is stored in a list with 
buffers and
+/// pages of the same size: there is a separate list for every power-of-two 
size. Each
+/// arena is protected by a separate lock, so in the common case where threads 
are able
+/// to fulfill allocations from their own arena, there will be no lock 
contention.
+///
+class BufferPool::BufferAllocator {
  public:
-  BufferAllocator(int64_t min_buffer_len);
+  BufferAllocator(BufferPool* pool, int64_t min_buffer_len, int64_t 
system_bytes_limit);
+  ~BufferAllocator();
 
-  /// Allocate memory for a buffer of 'len' bytes. 'len' must be a 
power-of-two multiple
-  /// of the minimum buffer length.
-  Status Allocate(int64_t len, BufferPool::BufferHandle* buffer) 
WARN_UNUSED_RESULT;
+  /// Allocate a buffer with a power-of-two length 'len'. This function may 
acquire
+  /// 'FreeBufferArena::lock_' and Page::lock so no locks lower in the lock 
acquisition
+  /// order (see buffer-pool-internal.h) should be held by the caller.
+  ///
+  /// Always succeeds on allocating memory up to 'system_bytes_limit', unless 
the system
+  /// is unable to give us 'system_bytes_limit' of memory or an internal bug: 
if all
+  /// clients write out enough dirty pages to stay within their reservation, 
then there
+  /// should always be enough free buffers and clean pages to reclaim.
+  Status Allocate(ClientHandle* client, int64_t len,
+      BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT;
 
-  /// Free the memory for a previously-allocated buffer.
+  /// Frees 'buffer', which must be open before calling. Closes 'buffer' and 
updates
+  /// internal state but does not release to any reservation.
   void Free(BufferPool::BufferHandle&& buffer);
 
+  /// Adds a clean page 'page' to a clean page list. Caller must hold the 
page's
+  /// client's lock via 'client_lock' so that moving the page between the 
client list and
+  /// the free page list is atomic. Caller must not hold 
'FreeBufferArena::lock_' or any
+  /// Page::lock.
+  void AddCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* 
page);
+
+  /// Removes a clean page 'page' from a clean page list and returns true, if 
present in
+  /// one of the lists. Returns true if it was present. If 'claim_buffer' is 
true, the
+  /// caller must have reservation for the buffer, which is returned along 
with the page.
+  /// Otherwise the buffer is moved directly to the free buffer list. Caller 
must hold
+  /// the page's client's lock via 'client_lock' so that moving the page 
between the
+  /// client list and the free page list is atomic. Caller must not hold
+  /// 'FreeBufferArena::lock_' or any Page::lock.
+  bool RemoveCleanPage(
+      const boost::unique_lock<boost::mutex>& client_lock, bool claim_buffer, 
Page* page);
+
+  /// Periodically called to release free buffers back to the SystemAllocator. 
Releases
+  /// buffers based on recent allocation patterns, trying to minimise the 
number of
+  /// excess buffers retained in each list above the minimum required to avoid 
going
+  /// to the system allocator.
+  void Maintenance();
+
+  /// Try to release at least 'bytes_to_free' bytes of memory to the system 
allocator.
+  void ReleaseMemory(int64_t bytes_to_free);
+
+  std::string DebugString();
+
+ protected:
+  friend class BufferAllocatorTest;
+  friend class BufferPoolTest;
+  friend class FreeBufferArena;
+
+  /// Test helper: gets the current size of the free list for buffers of 'len' 
bytes
+  /// on core 'core'.
+  int GetFreeListSize(int core, int64_t len);
+
+  /// Test helper: reduce the number of scavenge attempts so backend tests can 
force
+  /// use of the "locked" scavenging code path.
+  void set_max_scavenge_attempts(int val) {
+    DCHECK_GE(val, 1);
+    max_scavenge_attempts_ = val;
+  }
+
  private:
+  /// Compute the maximum power-of-two buffer length that could be allocated 
based on the
+  /// amount of memory available 'system_bytes_limit'. The value is always at 
least
+  /// 'min_buffer_len' so that there is at least one valid buffer size.
+  static int64_t CalcMaxBufferLen(int64_t min_buffer_len, int64_t 
system_bytes_limit);
+
+  /// Same as Allocate() but leaves 'buffer->client_' NULL and does not update 
counters.
+  Status AllocateInternal(
+      int64_t len, BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT;
+
+  /// Decrease 'system_bytes_remaining_' by up to 'max_decrease', down to a 
minimum of 0.
+  /// If 'require_full_decrease' is true, only decrease if we can decrease it
+  /// 'max_decrease'. Returns the amount it was decreased by.
+  int64_t DecreaseSystemBytesRemaining(int64_t max_decrease, bool 
require_full_decrease);
+
+  /// Tries to reclaim enough memory from various sources so that the caller 
can allocate
+  /// a buffer of 'target_bytes' from the system allocator. Scavenges buffers 
from the
+  /// free buffer and clean page lists of all cores and frees them with
+  /// 'system_allocator_'. Also tries to decrement 'system_bytes_remaining_'.
+  /// 'current_core' is the index of the current CPU core. Any bytes freed in 
excess of
+  /// 'target_bytes' are added to 'system_bytes_remaining_.' If 
'slow_but_sure' is true,
+  /// this function uses a slower strategy that guarantees enough memory will 
be found
+  /// but can block progress of other threads for longer. If 'slow_but_sure' 
is false,
+  /// then this function optimistically tries to reclaim the memory but may 
not reclaim
+  /// 'target_bytes' of memory. Returns the number of bytes reclaimed.
+  int64_t ScavengeBuffers(bool slow_but_sure, int current_core, int64_t 
target_bytes);
+
+  /// Helper to free a list of buffers to the system. Returns the number of 
bytes freed.
+  int64_t FreeToSystem(std::vector<BufferHandle>&& buffers);
+
+  /// The pool that this allocator is associated with.
+  BufferPool* const pool_;
+
+  /// System allocator that is ultimately used to allocate and free buffers.
+  const boost::scoped_ptr<SystemAllocator> system_allocator_;
+
+  /// The minimum power-of-two buffer length that can be allocated.
   const int64_t min_buffer_len_;
+
+  /// The maximum power-of-two buffer length that can be allocated. Always >=
+  /// 'min_buffer_len' so that there is at least one valid buffer size.
+  const int64_t max_buffer_len_;
+
+  /// The log2 of 'min_buffer_len_'.
+  const int log_min_buffer_len_;
+
+  /// The log2 of 'max_buffer_len_'.
+  const int log_max_buffer_len_;
+
+  /// The maximum physical memory in bytes that will be allocated from the 
system.
+  const int64_t system_bytes_limit_;
+
+  /// The remaining number of bytes of 'system_bytes_limit_' that can be used 
for
+  /// allocating new buffers. Must be updated atomically before a new buffer is
+  /// allocated or after an existing buffer is freed with the system allocator.
+  AtomicInt64 system_bytes_remaining_;
+
+  /// Free and clean pages. One arena per core.
+  std::vector<std::unique_ptr<FreeBufferArena>> per_core_arenas_;
+
+  /// Default number of times to attempt scavenging.
+  static const int MAX_SCAVENGE_ATTEMPTS = 3;
+
+  /// Number of times to attempt scavenging. Usually MAX_SCAVENGE_ATTEMPTS but 
can be
+  /// overridden by tests. The first max_scavenge_attempts_ - 1 attempts do 
not lock
+  /// all arenas so may fail. The final attempt locks all arenas, which is 
expensive
+  /// but is guaranteed to succeed.
+  int max_scavenge_attempts_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool-counters.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-counters.h 
b/be/src/runtime/bufferpool/buffer-pool-counters.h
index 183742f..58257de 100644
--- a/be/src/runtime/bufferpool/buffer-pool-counters.h
+++ b/be/src/runtime/bufferpool/buffer-pool-counters.h
@@ -25,8 +25,14 @@ namespace impala {
 /// A set of counters for each buffer pool client.
 struct BufferPoolClientCounters {
  public:
-  /// Amount of time spent trying to get a buffer.
-  RuntimeProfile::Counter* get_buffer_time;
+  /// Total amount of time spent inside BufferAllocator::AllocateBuffer().
+  RuntimeProfile::Counter* alloc_time;
+
+  /// Number of buffers allocated via BufferAllocator::AllocateBuffer().
+  RuntimeProfile::Counter* num_allocations;
+
+  /// Bytes of buffers allocated via BufferAllocator::AllocateBuffer().
+  RuntimeProfile::Counter* bytes_alloced;
 
   /// Amount of time spent waiting for reads from disk to complete.
   RuntimeProfile::Counter* read_wait_time;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h 
b/be/src/runtime/bufferpool/buffer-pool-internal.h
index f0dfa09..3428087 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -25,7 +25,8 @@
 /// =============
 /// The lock acquisition order is:
 /// 1. Client::lock_
-/// 2. BufferPool::clean_pages_lock_
+/// 2. FreeBufferArena::lock_. If multiple arena locks are acquired, must be 
acquired in
+///    ascending order.
 /// 3. Page::lock
 ///
 /// If a reference to a Page is acquired through a page list, the Page* 
reference only
@@ -44,7 +45,7 @@
 ///     a dirty unpinned page. The page is in Client::write_in_flight_pages_. 
For
 ///     accounting purposes this is considered a dirty page.
 /// * Unpinned - Clean: When the write has completed but the page was not 
evicted. The
-///     page is in BufferPool::clean_pages_.
+///     page is in a clean pages list in a BufferAllocator arena.
 /// * Unpinned - Evicted: After a clean page's buffer has been reclaimed. The 
page is
 ///     not in any list.
 ///

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc 
b/be/src/runtime/bufferpool/buffer-pool-test.cc
index c71ab60..42344f6 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -27,13 +27,16 @@
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
 #include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/buffer-pool-internal.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/test-env.h"
 #include "service/fe-support.h"
+#include "testutil/cpu-util.h"
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 #include "util/metrics.h"
 
 #include "common/names.h"
@@ -47,6 +50,7 @@ class BufferPoolTest : public ::testing::Test {
   virtual void SetUp() {
     test_env_ = obj_pool_.Add(new TestEnv);
     ASSERT_OK(test_env_->Init());
+    RandTestUtil::SeedRng("BUFFER_POOL_TEST_SEED", &rng_);
   }
 
   virtual void TearDown() {
@@ -59,6 +63,7 @@ class BufferPoolTest : public ::testing::Test {
     }
     global_reservations_.Close();
     obj_pool_.Clear();
+    CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
   }
 
   /// The minimum buffer size used in most tests.
@@ -105,11 +110,69 @@ class BufferPoolTest : public ::testing::Test {
     return !page->page_->buffer.is_open();
   }
 
+  /// Allocate buffers of varying sizes at most 'max_buffer_size' that add up 
to
+  /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer 
size.
+  /// If 'randomize_core' is true, will switch thread between cores randomly 
before
+  /// each allocation.
+  void AllocateBuffers(BufferPool* pool, BufferPool::ClientHandle* client,
+      int64_t max_buffer_size, int64_t total_bytes,
+      vector<BufferPool::BufferHandle>* buffers, bool randomize_core = false) {
+    int64_t curr_buffer_size = max_buffer_size;
+    int64_t bytes_remaining = total_bytes;
+    while (bytes_remaining > 0) {
+      while (curr_buffer_size > client->GetUnusedReservation()) 
curr_buffer_size /= 2;
+      if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
+      buffers->emplace_back();
+      ASSERT_OK(pool->AllocateBuffer(client, curr_buffer_size, 
&buffers->back()));
+      bytes_remaining -= curr_buffer_size;
+    }
+  }
+
+  /// Create pages of varying sizes at most 'max_page_size' that add up to
+  /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer 
size.
+  /// If 'randomize_core' is true, will switch thread between cores randomly 
before
+  /// each allocation.
+  void CreatePages(BufferPool* pool, BufferPool::ClientHandle* client,
+      int64_t max_page_size, int64_t total_bytes, 
vector<BufferPool::PageHandle>* pages,
+      bool randomize_core = false) {
+    int64_t curr_page_size = max_page_size;
+    int64_t bytes_remaining = total_bytes;
+    while (bytes_remaining > 0) {
+      while (curr_page_size > client->GetUnusedReservation()) curr_page_size 
/= 2;
+      pages->emplace_back();
+      if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
+      ASSERT_OK(pool->CreatePage(client, curr_page_size, &pages->back()));
+      bytes_remaining -= curr_page_size;
+    }
+  }
+
+  /// Free all the 'buffers' and clear the vector.
+  /// If 'randomize_core' is true, will switch thread between cores randomly 
before
+  /// each free.
+  void FreeBuffers(BufferPool* pool, BufferPool::ClientHandle* client,
+      vector<BufferPool::BufferHandle>* buffers, bool randomize_core = false) {
+    for (auto& buffer : *buffers) {
+      if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
+      pool->FreeBuffer(client, &buffer);
+    }
+    buffers->clear();
+  }
+
+  /// Set the maximum number of scavenge attempts that the pool's allocator 
wil do.
+  void SetMaxScavengeAttempts(BufferPool* pool, int max_attempts) {
+    pool->allocator()->set_max_scavenge_attempts(max_attempts);
+  }
+
+  void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
+
   ObjectPool obj_pool_;
   ReservationTracker global_reservations_;
 
   TestEnv* test_env_; // Owned by 'obj_pool_'.
 
+  /// Per-test random number generator. Seeded before every test.
+  std::mt19937 rng_;
+
   // The file groups created - closed at end of each test.
   vector<TmpFileMgr::FileGroup*> file_groups_;
 
@@ -668,6 +731,103 @@ TEST_F(BufferPoolTest, MultiplyPinnedPageAccounting) {
   pool.FreeBuffer(&client, &buffer);
   pool.DeregisterClient(&client);
 }
+
+// Constants for TestMemoryReclamation().
+const int MEM_RECLAMATION_NUM_CLIENTS = 2;
+// Choose a non-power-of two so that AllocateBuffers() will allocate a mix of 
sizes:
+// 32 + 32 + 32 + 8 + 4 + 2 + 1
+const int64_t MEM_RECLAMATION_BUFFERS_PER_CLIENT = 127;
+const int64_t MEM_RECLAMATION_CLIENT_RESERVATION =
+    BufferPoolTest::TEST_BUFFER_LEN * MEM_RECLAMATION_BUFFERS_PER_CLIENT;
+const int64_t MEM_RECLAMATION_TOTAL_BYTES =
+    MEM_RECLAMATION_NUM_CLIENTS * MEM_RECLAMATION_CLIENT_RESERVATION;
+
+// Test that we can reclaim buffers and pages from the same arena and from 
other arenas.
+TEST_F(BufferPoolTest, MemoryReclamation) {
+  global_reservations_.InitRootTracker(NULL, MEM_RECLAMATION_TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, MEM_RECLAMATION_TOTAL_BYTES);
+  // Assume that all cores are online. Test various combinations of cores to 
validate
+  // that it can reclaim from any other other core.
+  for (int src = 0; src < CpuInfo::num_cores(); ++src) {
+    // Limit the max scavenge attempts to force use of the "locked" scavenging 
sometimes,
+    // which would otherwise only be triggered by racing threads.
+    SetMaxScavengeAttempts(&pool, 1 + src % 3);
+    for (int j = 0; j < 4; ++j) {
+      int dst = (src + j) % CpuInfo::num_cores();
+      TestMemoryReclamation(&pool, src, dst);
+    }
+    // Test with one fixed and the other randomly changing
+    TestMemoryReclamation(&pool, src, -1);
+    TestMemoryReclamation(&pool, -1, src);
+  }
+  // Test with both src and dst randomly changing.
+  TestMemoryReclamation(&pool, -1, -1);
+  global_reservations_.Close();
+}
+
+// Test that we can reclaim buffers and pages from the same arena or a 
different arena.
+// Allocates then frees memory on 'src_core' then allocates on 'dst_core' to 
force
+// reclamation of memory from src_core's free buffer lists and clean page 
lists.
+// If 'src_core' or 'dst_core' is -1, randomly switch between cores instead of 
sticking
+// to a fixed core.
+void BufferPoolTest::TestMemoryReclamation(BufferPool* pool, int src_core, int 
dst_core) {
+  LOG(INFO) << "TestMemoryReclamation " << src_core << " -> " << dst_core;
+  const bool rand_src_core = src_core == -1;
+  const bool rand_dst_core = dst_core == -1;
+
+  BufferPool::ClientHandle clients[MEM_RECLAMATION_NUM_CLIENTS];
+  for (int i = 0; i < MEM_RECLAMATION_NUM_CLIENTS; ++i) {
+    ASSERT_OK(pool->RegisterClient(Substitute("test client $0", i), 
NewFileGroup(),
+        &global_reservations_, NULL, MEM_RECLAMATION_CLIENT_RESERVATION, 
NewProfile(),
+        &clients[i]));
+    
ASSERT_TRUE(clients[i].IncreaseReservation(MEM_RECLAMATION_CLIENT_RESERVATION));
+  }
+
+  // Allocate and free the whole pool's buffers on src_core to populate its 
free lists.
+  if (!rand_src_core) CpuTestUtil::PinToCore(src_core);
+  vector<BufferPool::BufferHandle> client_buffers[MEM_RECLAMATION_NUM_CLIENTS];
+  AllocateBuffers(pool, &clients[0], 32 * TEST_BUFFER_LEN,
+      MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_src_core);
+  AllocateBuffers(pool, &clients[1], 32 * TEST_BUFFER_LEN,
+      MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[1], rand_src_core);
+  FreeBuffers(pool, &clients[0], &client_buffers[0], rand_src_core);
+  FreeBuffers(pool, &clients[1], &client_buffers[1], rand_src_core);
+
+  // Allocate buffers again on dst_core. Make sure the size is bigger, 
smaller, and the
+  // same size as buffers we allocated earlier to we exercise different code 
paths.
+  if (!rand_dst_core) CpuTestUtil::PinToCore(dst_core);
+  AllocateBuffers(pool, &clients[0], 4 * TEST_BUFFER_LEN,
+      MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_dst_core);
+  FreeBuffers(pool, &clients[0], &client_buffers[0], rand_dst_core);
+
+  // Allocate and unpin the whole pool's buffers as clean pages on src_core to 
populate
+  // its clean page lists.
+  if (!rand_src_core) CpuTestUtil::PinToCore(src_core);
+  vector<BufferPool::PageHandle> client_pages[MEM_RECLAMATION_NUM_CLIENTS];
+  CreatePages(pool, &clients[0], 32 * TEST_BUFFER_LEN, 
MEM_RECLAMATION_CLIENT_RESERVATION,
+      &client_pages[0], rand_src_core);
+  CreatePages(pool, &clients[1], 32 * TEST_BUFFER_LEN, 
MEM_RECLAMATION_CLIENT_RESERVATION,
+      &client_pages[1], rand_src_core);
+  for (auto& page : client_pages[0]) pool->Unpin(&clients[0], &page);
+  for (auto& page : client_pages[1]) pool->Unpin(&clients[1], &page);
+
+  // Allocate the buffers again to force reclamation of the buffers from the 
clean pages.
+  if (!rand_dst_core) CpuTestUtil::PinToCore(dst_core);
+  AllocateBuffers(pool, &clients[0], 4 * TEST_BUFFER_LEN,
+      MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_dst_core);
+  FreeBuffers(pool, &clients[0], &client_buffers[0]);
+
+  // Just for good measure, pin the pages again then destroy them.
+  for (auto& page : client_pages[0]) {
+    ASSERT_OK(pool->Pin(&clients[0], &page));
+    pool->DestroyPage(&clients[0], &page);
+  }
+  for (auto& page : client_pages[1]) {
+    ASSERT_OK(pool->Pin(&clients[1], &page));
+    pool->DestroyPage(&clients[1], &page);
+  }
+  for (BufferPool::ClientHandle& client : clients) 
pool->DeregisterClient(&client);
+}
 }
 
 int main(int argc, char** argv) {
@@ -677,11 +837,16 @@ int main(int argc, char** argv) {
   impala::LlvmCodeGen::InitializeLlvm();
   int result = 0;
   for (bool encryption : {false, true}) {
-    FLAGS_disk_spill_encryption = encryption;
-    std::cerr << "+==================================================" << 
std::endl
-              << "| Running tests with encryption=" << encryption << std::endl
-              << "+==================================================" << 
std::endl;
-    if (RUN_ALL_TESTS() != 0) result = 1;
+    for (bool numa : {false, true}) {
+      if (!numa && encryption) continue; // Not an interesting combination.
+      impala::CpuTestUtil::SetupFakeNuma(numa);
+      FLAGS_disk_spill_encryption = encryption;
+      std::cerr << "+==================================================" << 
std::endl
+                << "| Running tests with encryption=" << encryption << " 
numa=" << numa
+                << std::endl
+                << "+==================================================" << 
std::endl;
+      if (RUN_ALL_TESTS() != 0) result = 1;
+    }
   }
   return result;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc 
b/be/src/runtime/bufferpool/buffer-pool.cc
index 482db10..b4b2420 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -22,9 +22,11 @@
 #include <boost/bind.hpp>
 
 #include "common/names.h"
+#include "gutil/bits.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/bufferpool/buffer-allocator.h"
 #include "util/bit-util.h"
+#include "util/cpu-info.h"
 #include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
 
@@ -35,10 +37,16 @@ DEFINE_int32(concurrent_scratch_ios_per_device, 2,
 
 namespace impala {
 
-void BufferPool::BufferHandle::Open(uint8_t* data, int64_t len) {
+constexpr int BufferPool::LOG_MAX_BUFFER_BYTES;
+constexpr int64_t BufferPool::MAX_BUFFER_BYTES;
+
+void BufferPool::BufferHandle::Open(uint8_t* data, int64_t len, int home_core) 
{
+  DCHECK_LE(0, home_core);
+  DCHECK_LT(home_core, CpuInfo::GetMaxNumCores());
   client_ = nullptr;
   data_ = data;
   len_ = len;
+  home_core_ = home_core;
 }
 
 BufferPool::PageHandle::PageHandle() {
@@ -90,17 +98,13 @@ const BufferPool::BufferHandle* 
BufferPool::PageHandle::buffer_handle() const {
 }
 
 BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
-  : allocator_(new BufferAllocator(min_buffer_len)),
-    min_buffer_len_(min_buffer_len),
-    buffer_bytes_limit_(buffer_bytes_limit),
-    buffer_bytes_remaining_(buffer_bytes_limit) {
+  : allocator_(new BufferAllocator(this, min_buffer_len, buffer_bytes_limit)),
+    min_buffer_len_(min_buffer_len) {
   DCHECK_GT(min_buffer_len, 0);
   DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
 }
 
-BufferPool::~BufferPool() {
-  DCHECK_EQ(0, clean_pages_.size());
-}
+BufferPool::~BufferPool() {}
 
 Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* 
file_group,
     ReservationTracker* parent_reservation, MemTracker* mem_tracker,
@@ -196,7 +200,7 @@ void BufferPool::ExtractBuffer(
 Status BufferPool::AllocateBuffer(
     ClientHandle* client, int64_t len, BufferHandle* handle) {
   RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len));
-  Status status = AllocateBufferInternal(client, len, handle);
+  Status status = allocator_->Allocate(client, len, handle);
   if (!status.ok()) {
     // Allocation failed - update client's accounting to reflect the failure.
     client->impl_->FreedBuffer(len);
@@ -204,44 +208,12 @@ Status BufferPool::AllocateBuffer(
   return status;
 }
 
-Status BufferPool::AllocateBufferInternal(
-    ClientHandle* client, int64_t len, BufferHandle* buffer) {
-  DCHECK(!buffer->is_open());
-  DCHECK_GE(len, min_buffer_len_);
-  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
-  SCOPED_TIMER(client->impl_->counters().get_buffer_time);
-
-  // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a 
new buffer.
-  int64_t delta = DecreaseBufferBytesRemaining(len);
-  if (delta < len) {
-    // We must evict some pages to free memory before allocating.
-    int64_t to_evict = len - delta;
-    RETURN_IF_ERROR(EvictCleanPages(to_evict));
-  }
-  Status status = allocator_->Allocate(len, buffer);
-  if (!status.ok()) {
-    buffer_bytes_remaining_.Add(len);
-    return status;
-  }
-  DCHECK(buffer->is_open());
-  buffer->client_ = client;
-  return Status::OK();
-}
-
 void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) {
   if (!handle->is_open()) return; // Should be idempotent.
   DCHECK_EQ(client, handle->client_);
   int64_t len = handle->len_;
-  FreeBufferInternal(handle);
-  client->impl_->FreedBuffer(len);
-}
-
-void BufferPool::FreeBufferInternal(BufferHandle* handle) {
-  DCHECK(handle->is_open());
-  int64_t buffer_len = handle->len();
   allocator_->Free(move(*handle));
-  buffer_bytes_remaining_.Add(buffer_len);
-  handle->Reset();
+  client->impl_->FreedBuffer(len);
 }
 
 Status BufferPool::TransferBuffer(ClientHandle* src_client, BufferHandle* src,
@@ -259,63 +231,12 @@ Status BufferPool::TransferBuffer(ClientHandle* 
src_client, BufferHandle* src,
   return Status::OK();
 }
 
-int64_t BufferPool::DecreaseBufferBytesRemaining(int64_t max_decrease) {
-  // TODO: we may want to change this policy so that we don't always use up to 
the limit
-  // for buffers, since this may starve other operators using non-buffer-pool 
memory.
-  while (true) {
-    int64_t old_value = buffer_bytes_remaining_.Load();
-    int64_t decrease = min(old_value, max_decrease);
-    int64_t new_value = old_value - decrease;
-    if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
-      return decrease;
-    }
-  }
-}
-
-void BufferPool::AddCleanPage(const unique_lock<mutex>& client_lock, Page* 
page) {
-  page->client->DCheckHoldsLock(client_lock);
-  lock_guard<SpinLock> cpl(clean_pages_lock_);
-  clean_pages_.Enqueue(page);
+void BufferPool::Maintenance() {
+  allocator_->Maintenance();
 }
 
-bool BufferPool::RemoveCleanPage(const unique_lock<mutex>& client_lock, Page* 
page) {
-  page->client->DCheckHoldsLock(client_lock);
-  lock_guard<SpinLock> cpl(clean_pages_lock_);
-  return clean_pages_.Remove(page);
-}
-
-Status BufferPool::EvictCleanPages(int64_t bytes_to_evict) {
-  DCHECK_GE(bytes_to_evict, 0);
-  vector<BufferHandle> buffers;
-  int64_t bytes_found = 0;
-  {
-    lock_guard<SpinLock> cpl(clean_pages_lock_);
-    while (bytes_found < bytes_to_evict) {
-      Page* page = clean_pages_.Dequeue();
-      if (page == NULL) break;
-      lock_guard<SpinLock> pl(page->buffer_lock);
-      bytes_found += page->len;
-      buffers.emplace_back(move(page->buffer));
-    }
-  }
-
-  // Free buffers after releasing all the locks. Do this regardless of success 
to avoid
-  // leaking buffers.
-  for (BufferHandle& buffer : buffers) allocator_->Free(move(buffer));
-  if (bytes_found < bytes_to_evict) {
-    // The buffer pool should not be overcommitted so this should only happen 
if there
-    // is an accounting error. Add any freed buffers back to 
'buffer_bytes_remaining_'
-    // to restore consistency.
-    buffer_bytes_remaining_.Add(bytes_found);
-    return Status(TErrorCode::INTERNAL_ERROR,
-        Substitute("Tried to evict $0 bytes but only $1 bytes of clean 
pages:\n$2",
-                      bytes_to_evict, bytes_found, DebugString()));
-  }
-  // Update 'buffer_bytes_remaining_' with any excess.
-  if (bytes_found > bytes_to_evict) {
-    buffer_bytes_remaining_.Add(bytes_found - bytes_to_evict);
-  }
-  return Status::OK();
+void BufferPool::ReleaseMemory(int64_t bytes_to_free) {
+  allocator_->ReleaseMemory(bytes_to_free);
 }
 
 bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) {
@@ -348,7 +269,10 @@ BufferPool::Client::Client(BufferPool* pool, 
TmpFileMgr::FileGroup* file_group,
     buffers_allocated_bytes_(0) {
   reservation_.InitChildTracker(
       profile, parent_reservation, mem_tracker, reservation_limit);
-  counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
+  counters_.alloc_time = ADD_TIMER(profile, "BufferPoolAllocTime");
+  counters_.num_allocations = ADD_COUNTER(profile, "BufferPoolAllocations", 
TUnit::UNIT);
+  counters_.bytes_alloced =
+      ADD_COUNTER(profile, "BufferPoolAllocationBytes", TUnit::BYTES);
   counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadIoWaitTime");
   counters_.read_io_ops = ADD_COUNTER(profile, "BufferPoolReadIoOps", 
TUnit::UNIT);
   counters_.bytes_read = ADD_COUNTER(profile, "BufferPoolReadIoBytes", 
TUnit::BYTES);
@@ -389,7 +313,7 @@ void BufferPool::Client::DestroyPageInternal(
       // Let the write complete, if in flight.
       WaitForWrite(&cl, page);
       // If clean, remove it from the clean pages list. If evicted, this is a 
no-op.
-      pool_->RemoveCleanPage(cl, page);
+      pool_->allocator_->RemoveCleanPage(cl, out_buffer != nullptr, page);
     }
     DCHECK(!page->in_queue());
     --num_pages_;
@@ -404,7 +328,7 @@ void BufferPool::Client::DestroyPageInternal(
     *out_buffer = std::move(page->buffer);
     buffers_allocated_bytes_ += out_buffer->len();
   } else if (page->buffer.is_open()) {
-    pool_->FreeBufferInternal(&page->buffer);
+    pool_->allocator_->Free(move(page->buffer));
   }
   delete page;
   handle->Reset();
@@ -431,20 +355,6 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* 
client, PageHandle* handle
   // Propagate any write errors that occurred for this client.
   RETURN_IF_ERROR(write_status_);
 
-  // Check if the page is evicted first. This is not necessary for 
correctness, since
-  // we re-check this later, but by doing it upfront we avoid grabbing the 
global
-  // 'clean_pages_lock_' in the common case.
-  bool evicted;
-  {
-    lock_guard<SpinLock> pl(page->buffer_lock);
-    evicted = !page->buffer.is_open();
-  }
-  if (evicted) {
-    // We may need to clean some pages to allocate a buffer for the evicted 
page.
-    RETURN_IF_ERROR(CleanPages(&cl, page->len));
-    return MoveEvictedToPinned(&cl, client, handle);
-  }
-
   if (dirty_unpinned_pages_.Remove(page)) {
     // No writes were initiated for the page - just move it back to the pinned 
state.
     pinned_pages_.Enqueue(page);
@@ -460,19 +370,17 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* 
client, PageHandle* handle
   // At this point we need to either reclaim a clean page or allocate a new 
buffer.
   // We may need to clean some pages to do so.
   RETURN_IF_ERROR(CleanPages(&cl, page->len));
-  if (pool_->RemoveCleanPage(cl, page)) {
-    // The clean page still has an associated buffer. Just clean up the write, 
restore
-    // the data, and move the page back to the pinned state.
+  if (pool_->allocator_->RemoveCleanPage(cl, true, page)) {
+    // The clean page still has an associated buffer. Restore the data, and 
move the page
+    // back to the pinned state.
     pinned_pages_.Enqueue(page);
     DCHECK(page->buffer.is_open());
     DCHECK(page->write_handle != NULL);
     // Don't need on-disk data.
     cl.unlock(); // Don't block progress for other threads operating on other 
pages.
-    return file_group_->CancelWriteAndRestoreData(
-        move(page->write_handle), page->buffer.mem_range());
+    return file_group_->RestoreData(move(page->write_handle), 
page->buffer.mem_range());
   }
-  // If the page wasn't in the global clean pages list, it must have been 
evicted after
-  // the earlier 'evicted' check.
+  // If the page wasn't in the clean pages list, it must have been evicted.
   return MoveEvictedToPinned(&cl, client, handle);
 }
 
@@ -486,7 +394,7 @@ Status BufferPool::Client::MoveEvictedToPinned(
   // can modify evicted pages.
   client_lock->unlock();
   BufferHandle buffer;
-  RETURN_IF_ERROR(pool_->AllocateBufferInternal(client, page->len, 
&page->buffer));
+  RETURN_IF_ERROR(pool_->allocator_->Allocate(client, page->len, 
&page->buffer));
   COUNTER_ADD(counters().bytes_read, page->len);
   COUNTER_ADD(counters().read_io_ops, 1);
   {
@@ -604,7 +512,7 @@ void BufferPool::Client::WriteCompleteCallback(Page* page, 
const Status& write_s
     // Move to clean pages list even if an error was encountered - the buffer 
can be
     // repurposed by other clients and 'write_status_' must be checked by this 
client
     // before reading back the bad data.
-    pool_->AddCleanPage(cl, page);
+    pool_->allocator_->AddCleanPage(cl, page);
     WriteDirtyPagesAsync(); // Start another asynchronous write if needed.
 
     // Notify before releasing lock to avoid race with Page and Client 
destruction.
@@ -680,14 +588,8 @@ string BufferPool::BufferHandle::DebugString() const {
 
 string BufferPool::DebugString() {
   stringstream ss;
-  ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
-     << " buffer_bytes_limit: " << buffer_bytes_limit_
-     << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n"
-     << "  Clean pages: ";
-  {
-    lock_guard<SpinLock> cpl(clean_pages_lock_);
-    clean_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
-  }
+  ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_ << 
"\n"
+     << allocator_->DebugString();
   return ss.str();
 }
 }

Reply via email to