Repository: incubator-impala
Updated Branches:
  refs/heads/master 8bdfe0320 -> 42002b91c


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index da8ef3c..7770fff 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -20,11 +20,13 @@
 
 #include <stdint.h>
 #include <string>
+#include <vector>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
 
 #include "common/atomic.h"
 #include "common/compiler-util.h"
+#include "common/object-pool.h"
 #include "common/status.h"
 #include "gutil/macros.h"
 #include "runtime/tmp-file-mgr.h"
@@ -35,9 +37,9 @@
 
 namespace impala {
 
-class BufferAllocator;
 class ReservationTracker;
 class RuntimeProfile;
+class SystemAllocator;
 
 /// A buffer pool that manages memory buffers for all queries in an Impala 
daemon.
 /// The buffer pool enforces buffer reservations, limits, and implements 
policies
@@ -57,9 +59,6 @@ class RuntimeProfile;
 /// with the buffer). Unless otherwise noted, it is not safe to invoke 
concurrent buffer
 /// pool operations for the same client.
 ///
-/// TODO:
-/// * Decide on, document, and enforce upper limits on page size.
-///
 /// Pages, Buffers and Pinning
 /// ==========================
 /// * A page is a logical block of memory that can reside in memory or on disk.
@@ -146,9 +145,9 @@ class RuntimeProfile;
 /// operations with the same Client, PageHandle or BufferHandle.
 class BufferPool : public CacheLineAligned {
  public:
+  class BufferAllocator;
   class BufferHandle;
   class ClientHandle;
-  class Client;
   class PageHandle;
 
   /// Constructs a new buffer pool.
@@ -240,79 +239,44 @@ class BufferPool : public CacheLineAligned {
   Status TransferBuffer(ClientHandle* src_client, BufferHandle* src,
       ClientHandle* dst_client, BufferHandle* dst) WARN_UNUSED_RESULT;
 
+  /// Try to release at least 'bytes_to_free' bytes of memory to the system 
allocator.
+  /// TODO: once IMPALA-4834 is done and all large allocations are served from 
the buffer
+  /// pool, this may not be necessary.
+  void ReleaseMemory(int64_t bytes_to_free);
+
+  /// Called periodically by a maintenance thread to released unneeded memory 
back to the
+  /// system allocator.
+  void Maintenance();
+
   /// Print a debug string with the state of the buffer pool.
   std::string DebugString();
 
   int64_t min_buffer_len() const { return min_buffer_len_; }
-  int64_t buffer_bytes_limit() const { return buffer_bytes_limit_; }
+
+  /// Generous upper bounds on page and buffer size and the number of different
+  /// power-of-two buffer sizes.
+  static constexpr int LOG_MAX_BUFFER_BYTES = 48;
+  static constexpr int64_t MAX_BUFFER_BYTES = 1L << LOG_MAX_BUFFER_BYTES;
+
+ protected:
+  friend class BufferPoolTest;
+  /// Test helper: get a reference to the allocator.
+  BufferAllocator* allocator() { return allocator_.get(); }
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferPool);
+  class Client;
+  class FreeBufferArena;
   class PageList;
   struct Page;
-  /// Allocate a buffer of length 'len'. Assumes that the client's reservation 
has already
-  /// been consumed for the buffer. Returns an error if the pool is unable to 
fulfill the
-  /// reservation. This function may acquire 'clean_pages_lock_' and 
Page::lock so
-  /// no locks lower in the lock acquisition order (see 
buffer-pool-internal.h) should be
-  /// held by the caller.
-  Status AllocateBufferInternal(
-      ClientHandle* client, int64_t len, BufferHandle* buffer) 
WARN_UNUSED_RESULT;
-
-  /// Frees 'buffer', which must be open before calling. Closes 'buffer' and 
updates
-  /// internal state but does not release to any reservation.
-  void FreeBufferInternal(BufferHandle* buffer);
-
-  /// Decrease 'buffer_bytes_remaining_' by up to 'len', down to a minimum of 
0.
-  /// Returns the amount it was decreased by.
-  int64_t DecreaseBufferBytesRemaining(int64_t max_decrease);
-
-  /// Adds a clean page 'page' to the global clean pages list, unless the page 
is in the
-  /// process of being cleaned up. Caller must hold the page's client's lock 
via
-  /// 'client_lock' so that moving the page between a client list and the 
global free
-  /// page list is atomic. Caller must not hold 'clean_pages_lock_' or any 
Page::lock.
-  void AddCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* 
page);
-
-  /// Removes a clean page 'page' from the global clean pages list, if 
present. Returns
-  /// true if it was present. Caller must hold the page's client's lock via
-  /// 'client_lock' so that moving the page between list is atomic and there 
is not a
-  /// window so that moving the page between a client list and the global free 
page list
-  /// is atomic. Caller must not hold 'clean_pages_lock_' or any Page::lock.
-  bool RemoveCleanPage(const boost::unique_lock<boost::mutex>& client_lock, 
Page* page);
-
-  /// Evict at least 'bytes_to_evict' bytes of clean pages and free the 
associated
-  /// buffers with 'allocator_'. Any bytes freed in excess of 'bytes_to_evict' 
are
-  /// added to 'buffer_bytes_remaining_.'
-  ///
-  /// Returns an error and adds any freed bytes to 'buffer_bytes_remaining_' 
if not
-  /// enough bytes could be evicted. This will only happen if there is an 
internal
-  /// bug: if all clients write out enough dirty pages to stay within their 
reservation,
-  /// then there should always be enough clean pages.
-  Status EvictCleanPages(int64_t bytes_to_evict);
 
-  /// Allocator for allocating and freeing all buffer memory.
+  /// Allocator for allocating and freeing all buffer memory and managing 
lists of free
+  /// buffers and clean pages.
   boost::scoped_ptr<BufferAllocator> allocator_;
 
   /// The minimum length of a buffer in bytes. All buffers and pages are a 
power-of-two
   /// multiple of this length. This is always a power of two.
   const int64_t min_buffer_len_;
-
-  /// The maximum physical memory in bytes that can be used for buffers.
-  const int64_t buffer_bytes_limit_;
-
-  /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used 
for
-  /// allocating new buffers. Must be updated atomically before a new buffer is
-  /// allocated or after an existing buffer is freed.
-  /// TODO: reconsider this to avoid all threads contending on this one value.
-  AtomicInt64 buffer_bytes_remaining_;
-
-  /// Unpinned pages that have had their contents written to disk. These pages 
can be
-  /// evicted to allocate a buffer for any client. Pages are evicted in FIFO 
order,
-  /// so that pages are evicted in approximately the same order that the 
clients wrote
-  /// them to disk. 'clean_pages_lock_' protects 'clean_pages_'.
-  /// TODO: consider breaking up by page size
-  /// TODO: consider breaking up by core/NUMA node to improve locality
-  alignas(CACHE_LINE_SIZE) SpinLock clean_pages_lock_;
-  InternalList<Page> clean_pages_;
 };
 
 /// External representation of a client of the BufferPool. Clients are used for
@@ -390,17 +354,17 @@ class BufferPool::BufferHandle {
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferHandle);
   friend class BufferPool;
-  friend class BufferAllocator;
+  friend class SystemAllocator;
 
   /// Internal helper to set the handle to an opened state.
-  void Open(uint8_t* data, int64_t len);
+  void Open(uint8_t* data, int64_t len, int home_core);
 
   /// Internal helper to reset the handle to an unopened state. Inlined to 
make moving
   /// efficient.
   inline void Reset();
 
   /// The client the buffer handle belongs to, used to validate that the 
correct client
-  /// is provided in BufferPool method calls.
+  /// is provided in BufferPool method calls. Set to NULL if the buffer is in 
a free list.
   const ClientHandle* client_;
 
   /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
@@ -408,6 +372,10 @@ class BufferPool::BufferHandle {
 
   /// Length of the buffer in bytes.
   int64_t len_;
+
+  /// The CPU core that the buffer was allocated from - used to determine 
which arena
+  /// it will be added to.
+  int home_core_;
 };
 
 /// The handle for a page used by clients of the BufferPool. Each PageHandle 
should
@@ -477,6 +445,7 @@ inline BufferPool::BufferHandle& 
BufferPool::BufferHandle::operator=(
   client_ = src.client_;
   data_ = src.data_;
   len_ = src.len_;
+  home_core_ = src.home_core_;
   src.Reset();
   return *this;
 }
@@ -485,6 +454,7 @@ inline void BufferPool::BufferHandle::Reset() {
   client_ = NULL;
   data_ = NULL;
   len_ = -1;
+  home_core_ = -1;
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/free-list-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/free-list-test.cc 
b/be/src/runtime/bufferpool/free-list-test.cc
index 190faa9..7cb80b4 100644
--- a/be/src/runtime/bufferpool/free-list-test.cc
+++ b/be/src/runtime/bufferpool/free-list-test.cc
@@ -20,7 +20,9 @@
 
 #include "common/object-pool.h"
 #include "runtime/bufferpool/free-list.h"
+#include "runtime/bufferpool/system-allocator.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 
 #include "common/names.h"
 
@@ -29,8 +31,8 @@ namespace impala {
 class FreeListTest : public ::testing::Test {
  protected:
   virtual void SetUp() override {
-    allocator_ = obj_pool_.Add(new BufferAllocator(MIN_BUFFER_LEN));
-    SeedRng();
+    allocator_ = obj_pool_.Add(new SystemAllocator(MIN_BUFFER_LEN));
+    RandTestUtil::SeedRng("FREE_LIST_TEST_SEED", &rng_);
   }
 
   virtual void TearDown() override {
@@ -38,16 +40,8 @@ class FreeListTest : public ::testing::Test {
     obj_pool_.Clear();
   }
 
-  /// Seed 'rng_' with a seed either for the environment or based on the 
current time.
-  void SeedRng() {
-    const char* seed_str = getenv("FREE_LIST_TEST_SEED");
-    int64_t seed = seed_str != nullptr ? atoi(seed_str) : time(nullptr);
-    LOG(INFO) << "Random seed: " << seed;
-    rng_.seed(seed);
-  }
-
-  void AllocateBuffers(int num_buffers, int64_t buffer_len,
-      vector<BufferHandle>* buffers) {
+  void AllocateBuffers(
+      int num_buffers, int64_t buffer_len, vector<BufferHandle>* buffers) {
     for (int i = 0; i < num_buffers; ++i) {
       BufferHandle buffer;
       ASSERT_OK(allocator_->Allocate(buffer_len, &buffer));
@@ -69,11 +63,10 @@ class FreeListTest : public ::testing::Test {
     buffers->clear();
   }
 
-  void FreeBuffers(vector<BufferHandle>* buffers) {
-    for (BufferHandle& buffer : *buffers) {
+  void FreeBuffers(vector<BufferHandle>&& buffers) {
+    for (BufferHandle& buffer : buffers) {
       allocator_->Free(move(buffer));
     }
-    buffers->clear();
   }
 
   const static int MIN_BUFFER_LEN = 1024;
@@ -85,7 +78,7 @@ class FreeListTest : public ::testing::Test {
   ObjectPool obj_pool_;
 
   /// The buffer allocator, owned by 'obj_pool_'.
-  BufferAllocator* allocator_;
+  SystemAllocator* allocator_;
 };
 
 const int FreeListTest::MIN_BUFFER_LEN;
@@ -115,7 +108,8 @@ TEST_F(FreeListTest, SmallList) {
       std::shuffle(buffers.begin(), buffers.end(), rng_);
       AddFreeBuffers(&small_list, &buffers);
       // Shrink list down to LIST_SIZE.
-      small_list.FreeBuffers(allocator_, max<int64_t>(0, small_list.Size() - 
LIST_SIZE));
+      FreeBuffers(
+          small_list.GetBuffersToFree(max<int64_t>(0, small_list.Size() - 
LIST_SIZE)));
 
       // The LIST_SIZE buffers with the lowest address should be retained, and 
the
       // remaining buffers should have been freed.
@@ -125,7 +119,7 @@ TEST_F(FreeListTest, SmallList) {
         buffers.push_back(move(buffer));
       }
       ASSERT_FALSE(small_list.PopFreeBuffer(&buffer));
-      FreeBuffers(&buffers);
+      FreeBuffers(move(buffers));
     }
   }
 }
@@ -148,7 +142,7 @@ TEST_F(FreeListTest, ReturnOrder) {
       AddFreeBuffers(&list, &buffers);
 
       // Free buffers. Only the buffers with the high addresses should be 
freed.
-      list.FreeBuffers(allocator_, max<int64_t>(0, list.Size() - LIST_SIZE));
+      FreeBuffers(list.GetBuffersToFree(max<int64_t>(0, list.Size() - 
LIST_SIZE)));
 
       // Validate that the buffers with lowest addresses are returned in 
ascending order.
       BufferHandle buffer;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/free-list.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/free-list.h 
b/be/src/runtime/bufferpool/free-list.h
index 6cec0e8..ec1057d 100644
--- a/be/src/runtime/bufferpool/free-list.h
+++ b/be/src/runtime/bufferpool/free-list.h
@@ -26,7 +26,6 @@
 
 #include "common/logging.h"
 #include "gutil/macros.h"
-#include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/buffer-pool.h"
 
 namespace impala {
@@ -75,28 +74,21 @@ class FreeList {
     std::push_heap(free_list_.begin(), free_list_.end(), HeapCompare);
   }
 
-  /// Frees all the buffers in the list with 'allocator'. Returns the number 
of bytes
-  /// freed.
-  int64_t FreeAll(BufferAllocator* allocator) {
-    return FreeBuffers(allocator, free_list_.size());
-  }
-
-  /// Free 'num_buffers' buffers from the list with 'allocator'. Returns the 
number of
-  /// bytes freed. The average time complexity is n log n, where n is the 
current size of
-  /// the list.
-  int64_t FreeBuffers(BufferAllocator* allocator, int64_t num_buffers) {
+  /// Get the 'num_buffers' buffers with the highest memory address from the 
list to
+  /// free. The average time complexity is n log n, where n is the current 
size of the
+  /// list.
+  vector<BufferHandle> GetBuffersToFree(int64_t num_buffers) {
+    vector<BufferHandle> buffers;
     DCHECK_LE(num_buffers, free_list_.size());
     // Sort the list so we can free the buffers with higher memory addresses.
     // Note that the sorted list is still a valid min-heap.
     std::sort(free_list_.begin(), free_list_.end(), SortCompare);
 
-    int64_t bytes_freed = 0;
     for (int64_t i = 0; i < num_buffers; ++i) {
-      bytes_freed += free_list_.back().len();
-      allocator->Free(std::move(free_list_.back()));
+      buffers.emplace_back(std::move(free_list_.back()));
       free_list_.pop_back();
     }
-    return bytes_freed;
+    return buffers;
   }
 
   /// Returns the number of buffers currently in the list.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc 
b/be/src/runtime/bufferpool/suballocator-test.cc
index 13bc597..ea515fe 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -29,6 +29,7 @@
 #include "runtime/bufferpool/suballocator.h"
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 #include "util/bit-util.h"
 
 #include "common/names.h"
@@ -43,7 +44,7 @@ namespace impala {
 class SuballocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() override {
-    SeedRng();
+    RandTestUtil::SeedRng("SUBALLOCATOR_TEST_SEED", &rng_);
     profile_.reset(new RuntimeProfile(&obj_pool_, "test profile"));
   }
 
@@ -63,19 +64,6 @@ class SuballocatorTest : public ::testing::Test {
   const static int64_t TEST_BUFFER_LEN = Suballocator::MIN_ALLOCATION_BYTES * 
16;
 
  protected:
-  /// Seed 'rng_' with a seed either for the environment or based on the 
current time.
-  void SeedRng() {
-    const char* seed_str = getenv("SUBALLOCATOR_TEST_SEED");
-    int64_t seed;
-    if (seed_str != nullptr) {
-      seed = atoi(seed_str);
-    } else {
-      seed = time(nullptr);
-    }
-    LOG(INFO) << "Random seed: " << seed;
-    rng_.seed(seed);
-  }
-
   /// Initialize 'buffer_pool_' and 'global_reservation_' with a limit of 
'total_mem'
   /// bytes of buffers of minimum length 'min_buffer_len'.
   void InitPool(int64_t min_buffer_len, int total_mem) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/suballocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator.h 
b/be/src/runtime/bufferpool/suballocator.h
index 53f4ef5..e6834ad 100644
--- a/be/src/runtime/bufferpool/suballocator.h
+++ b/be/src/runtime/bufferpool/suballocator.h
@@ -89,10 +89,10 @@ class Suballocator {
   /// failed Allocate() call).
   void Free(std::unique_ptr<Suballocation> allocation);
 
-  /// Generous upper bounds on the max allocation size and the number of 
different
+  /// Upper bounds on the max allocation size and the number of different
   /// power-of-two allocation sizes. Used to bound the number of free lists.
-  static constexpr int LOG_MAX_ALLOCATION_BYTES = 48;
-  static constexpr int64_t MAX_ALLOCATION_BYTES = 1L << 
LOG_MAX_ALLOCATION_BYTES;
+  static constexpr int LOG_MAX_ALLOCATION_BYTES = 
BufferPool::LOG_MAX_BUFFER_BYTES;
+  static constexpr int64_t MAX_ALLOCATION_BYTES = BufferPool::MAX_BUFFER_BYTES;
 
   /// Don't support allocations less than 4kb to avoid high overhead.
   static constexpr int LOG_MIN_ALLOCATION_BYTES = 12;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/system-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/system-allocator.cc 
b/be/src/runtime/bufferpool/system-allocator.cc
new file mode 100644
index 0000000..0ea429a
--- /dev/null
+++ b/be/src/runtime/bufferpool/system-allocator.cc
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/bufferpool/system-allocator.h"
+
+#include "util/bit-util.h"
+
+namespace impala {
+
+SystemAllocator::SystemAllocator(int64_t min_buffer_len)
+  : min_buffer_len_(min_buffer_len) {
+  DCHECK(BitUtil::IsPowerOf2(min_buffer_len));
+}
+
+Status SystemAllocator::Allocate(int64_t len, BufferPool::BufferHandle* 
buffer) {
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_LE(len, BufferPool::MAX_BUFFER_BYTES);
+  DCHECK(BitUtil::IsPowerOf2(len)) << len;
+
+  uint8_t* alloc = reinterpret_cast<uint8_t*>(malloc(len));
+  if (alloc == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len);
+  buffer->Open(alloc, len, CpuInfo::GetCurrentCore());
+  return Status::OK();
+}
+
+void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) {
+  free(buffer.data());
+  buffer.Reset(); // Avoid DCHECK in ~BufferHandle().
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/system-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/system-allocator.h 
b/be/src/runtime/bufferpool/system-allocator.h
new file mode 100644
index 0000000..8b4a544
--- /dev/null
+++ b/be/src/runtime/bufferpool/system-allocator.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_SYSTEM_ALLOCATOR_H
+#define IMPALA_RUNTIME_SYSTEM_ALLOCATOR_H
+
+#include "common/status.h"
+
+#include "runtime/bufferpool/buffer-pool.h"
+
+namespace impala {
+
+/// The underlying memory allocator for the buffer pool that allocates buffer 
memory from
+/// the system. All buffers are allocated through the BufferPool's 
SystemAllocator. The
+/// allocator only handles allocating buffers that are power-of-two multiples 
of the
+/// minimum buffer length.
+///
+/// TODO:
+/// * Allocate memory with mmap() instead of malloc().
+class SystemAllocator {
+ public:
+  SystemAllocator(int64_t min_buffer_len);
+
+  /// Allocate memory for a buffer of 'len' bytes. 'len' must be a 
power-of-two multiple
+  /// of the minimum buffer length.
+  Status Allocate(int64_t len, BufferPool::BufferHandle* buffer) 
WARN_UNUSED_RESULT;
+
+  /// Free the memory for a previously-allocated buffer.
+  void Free(BufferPool::BufferHandle&& buffer);
+
+ private:
+  const int64_t min_buffer_len_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 5d9ced1..b973a84 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -380,9 +380,6 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   DCHECK(process_mem_tracker != NULL);
   free_buffer_mem_tracker_.reset(
       new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
-  // If we hit the process limit, see if we can reclaim some memory by removing
-  // previously allocated (but unused) io buffers.
-  process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this));
 
   for (int i = 0; i < disk_queues_.size(); ++i) {
     disk_queues_[i] = new DiskQueue(i);
@@ -759,12 +756,17 @@ DiskIoMgr::BufferDescriptor* 
DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read
   return GetBufferDesc(reader, reader->mem_tracker_, range, buffer, 
buffer_size);
 }
 
-void DiskIoMgr::GcIoBuffers() {
+void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
   unique_lock<mutex> lock(free_buffers_lock_);
   int buffers_freed = 0;
   int bytes_freed = 0;
+  // Free small-to-large to avoid retaining many small buffers and fragmenting 
memory.
   for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    for (uint8_t* buffer : free_buffers_[idx]) {
+    std::list<uint8_t*>* free_buffers = &free_buffers_[idx];
+    while (
+        !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= 
bytes_to_free)) {
+      uint8_t* buffer = free_buffers->front();
+      free_buffers->pop_front();
       int64_t buffer_size = (1LL << idx) * min_buffer_size_;
       delete[] buffer;
       free_buffer_mem_tracker_->Release(buffer_size);
@@ -773,7 +775,7 @@ void DiskIoMgr::GcIoBuffers() {
       ++buffers_freed;
       bytes_freed += buffer_size;
     }
-    free_buffers_[idx].clear();
+    if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
   }
 
   if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
@@ -783,7 +785,7 @@ void DiskIoMgr::GcIoBuffers() {
     ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
   }
   if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
-    ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->set_value(0);
+    ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed);
   }
 }
 
@@ -1125,7 +1127,7 @@ DiskIoMgr::BufferDescriptor* 
DiskIoMgr::TryAllocateNextBufferForRange(
   DCHECK(reader->mem_tracker_ != NULL);
   bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
   if (!enough_memory) {
-    // Low memory, GC and try again.
+    // Low memory, GC all the buffers and try again.
     GcIoBuffers();
     enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index b7222cb..b70c9c8 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -774,6 +774,10 @@ class DiskIoMgr {
   /// later reuse.
   void CacheOrCloseFileHandle(const char* fname, HdfsCachedFileHandle* fid, 
bool close);
 
+  /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the 
buffers if
+  /// 'bytes_to_free' is -1.
+  void GcIoBuffers(int64_t bytes_to_free = -1);
+
   /// Default ready buffer queue capacity. This constant doesn't matter too 
much
   /// since the system dynamically adjusts.
   static const int DEFAULT_QUEUE_CAPACITY;
@@ -902,12 +906,6 @@ class DiskIoMgr {
   /// Returns a buffer desc object which can now be used for another reader.
   void ReturnBufferDesc(BufferDescriptor* desc);
 
-  /// Garbage collect all unused io buffers. This is currently only triggered 
when the
-  /// process wide limit is hit. This is not good enough. While it is 
sufficient for
-  /// the IoMgr, other components do not trigger this GC.
-  /// TODO: make this run periodically?
-  void GcIoBuffers();
-
   /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
NULL), either
   /// freeing it or returning it to 'free_buffers_'. Memory tracking is 
updated to
   /// reflect the transfer of ownership from desc->mem_tracker_ to the disk 
I/O mgr.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index dd27a91..1fa863e 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -313,12 +313,6 @@ Status ExecEnv::StartServices() {
   // Limit of -1 means no memory limit.
   mem_tracker_.reset(new MemTracker(TcmallocMetric::PHYSICAL_BYTES_RESERVED,
       bytes_limit > 0 ? bytes_limit : -1, "Process"));
-
-  // Since tcmalloc does not free unused memory, we may exceed the process mem 
limit even
-  // if Impala is not actually using that much memory. Add a callback to free 
any unused
-  // memory if we hit the process limit.
-  mem_tracker_->AddGcFunction(boost::bind(&MallocExtension::ReleaseFreeMemory,
-                                          MallocExtension::instance()));
 #else
   // tcmalloc metrics aren't defined in ASAN builds, just use the default 
behavior to
   // track process memory usage (sum of all children trackers).
@@ -338,6 +332,22 @@ Status ExecEnv::StartServices() {
 
   RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
 
+  mem_tracker_->AddGcFunction(
+      [this](int64_t bytes_to_free) { 
disk_io_mgr_->GcIoBuffers(bytes_to_free); });
+
+  // TODO: IMPALA-3200: register BufferPool::ReleaseMemory() as GC function.
+
+#ifndef ADDRESS_SANITIZER
+  // Since tcmalloc does not free unused memory, we may exceed the process mem 
limit even
+  // if Impala is not actually using that much memory. Add a callback to free 
any unused
+  // memory if we hit the process limit. TCMalloc GC must run last, because 
other GC
+  // functions may have released memory to TCMalloc, and TCMalloc may have 
cached it
+  // instead of releasing it to the system.
+  mem_tracker_->AddGcFunction([](int64_t bytes_to_free) {
+    MallocExtension::instance()->ReleaseToSystem(bytes_to_free);
+  });
+#endif
+
   // Start services in order to ensure that dependencies between them are met
   if (enable_webserver_) {
     AddDefaultUrlCallbacks(webserver_.get(), mem_tracker_.get(), 
metrics_.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index a48ede9..9e3775d 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -292,26 +292,37 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, 
const std::string& deta
   return status;
 }
 
+void MemTracker::AddGcFunction(GcFunction f) {
+  gc_functions_.push_back(f);
+}
+
 bool MemTracker::GcMemory(int64_t max_consumption) {
   if (max_consumption < 0) return true;
   lock_guard<mutex> l(gc_lock_);
   if (consumption_metric_ != NULL) 
consumption_->Set(consumption_metric_->value());
-  uint64_t pre_gc_consumption = consumption();
+  int64_t pre_gc_consumption = consumption();
   // Check if someone gc'd before us
   if (pre_gc_consumption < max_consumption) return false;
   if (num_gcs_metric_ != NULL) num_gcs_metric_->Increment(1);
 
+  int64_t curr_consumption = pre_gc_consumption;
   // Try to free up some memory
   for (int i = 0; i < gc_functions_.size(); ++i) {
-    gc_functions_[i]();
+    // Try to free up the amount we are over plus some extra so that we don't 
have to
+    // immediately GC again. Don't free all the memory since that can be 
unnecessarily
+    // expensive.
+    const int64_t EXTRA_BYTES_TO_FREE = 512L * 1024L * 1024L;
+    int64_t bytes_to_free = curr_consumption - max_consumption + 
EXTRA_BYTES_TO_FREE;
+    gc_functions_[i](bytes_to_free);
     if (consumption_metric_ != NULL) RefreshConsumptionFromMetric();
-    if (consumption() <= max_consumption) break;
+    curr_consumption = consumption();
+    if (max_consumption - curr_consumption <= EXTRA_BYTES_TO_FREE) break;
   }
 
   if (bytes_freed_by_last_gc_metric_ != NULL) {
-    bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - 
consumption());
+    bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - 
curr_consumption);
   }
-  return consumption() > max_consumption;
+  return curr_consumption > max_consumption;
 }
 
 void MemTracker::GcTcmalloc() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index acf3b46..863d3c7 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -294,14 +294,17 @@ class MemTracker {
 
   MemTracker* parent() const { return parent_; }
 
-  /// Signature for function that can be called to free some memory after 
limit is reached.
-  /// See the class header for further details on what these functions should 
do.
-  typedef boost::function<void ()> GcFunction;
-
-  /// Add a function 'f' to be called if the limit is reached.
+  /// Signature for function that can be called to free some memory after 
limit is
+  /// reached. The function should try to free at least 'bytes_to_free' bytes 
of
+  /// memory. See the class header for further details on the expected 
behaviour of
+  /// these functions.
+  typedef std::function<void(int64_t bytes_to_free)> GcFunction;
+
+  /// Add a function 'f' to be called if the limit is reached, if none of the 
other
+  /// previously-added GC functions were successful at freeing up enough 
memory.
   /// 'f' does not need to be thread-safe as long as it is added to only one 
MemTracker.
   /// Note that 'f' must be valid for the lifetime of this MemTracker.
-  void AddGcFunction(GcFunction f) { gc_functions_.push_back(f); }
+  void AddGcFunction(GcFunction f);
 
   /// Register this MemTracker's metrics. Each key will be of the form
   /// "<prefix>.<metric name>".

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index b220fff..dee3c64 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -481,7 +481,9 @@ TEST_F(TmpFileMgrTest, TestEncryptionDuringCancellation) {
   string file_path = handle->TmpFilePath();
 
   // Cancel the write - prior to the IMPALA-4820 fix decryption could race 
with the write.
-  ASSERT_OK(file_group.CancelWriteAndRestoreData(move(handle), 
data_mem_range));
+  handle->Cancel();
+  handle->WaitForWrite();
+  ASSERT_OK(file_group.RestoreData(move(handle), data_mem_range));
   WaitForCallbacks(1);
 
   // Read the data from the scratch file and check that the plaintext isn't 
present.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index c939d88..abba192 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -407,13 +407,11 @@ Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, 
MemRange buffer) {
   return Status::OK();
 }
 
-Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData(
+Status TmpFileMgr::FileGroup::RestoreData(
     unique_ptr<WriteHandle> handle, MemRange buffer) {
   DCHECK_EQ(handle->write_range_->data(), buffer.data());
   DCHECK_EQ(handle->len(), buffer.len());
-  handle->Cancel();
-
-  handle->WaitForWrite();
+  DCHECK(!handle->write_in_flight_);
   // Decrypt after the write is finished, so that we don't accidentally write 
decrypted
   // data to disk.
   Status status;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index cab2d87..d66c527 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -117,8 +117,7 @@ class TmpFileMgr {
     /// a different thread when the write completes successfully or 
unsuccessfully or is
     /// cancelled.
     ///
-    /// 'handle' must be destroyed by passing the DestroyWriteHandle() or
-    /// CancelWriteAndRestoreData().
+    /// 'handle' must be destroyed by passing the DestroyWriteHandle() or 
RestoreData().
     Status Write(MemRange buffer, WriteDoneCallback cb,
         std::unique_ptr<WriteHandle>* handle) WARN_UNUSED_RESULT;
 
@@ -127,11 +126,11 @@ class TmpFileMgr {
     /// after a write successfully completes.
     Status Read(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
 
-    /// Cancels the write referenced by 'handle' and destroy associate 
resources. Also
-    /// restore the original data in the 'buffer' passed to Write(), 
decrypting or
-    /// decompressing as necessary. The cancellation always succeeds, but an 
error
-    /// is returned if restoring the data fails.
-    Status CancelWriteAndRestoreData(
+    /// Restore the original data in the 'buffer' passed to Write(), 
decrypting or
+    /// decompressing as necessary. Returns an error if restoring the data 
fails.
+    /// The write must not be in-flight - the caller is responsible for 
waiting for
+    /// the write to complete.
+    Status RestoreData(
         std::unique_ptr<WriteHandle> handle, MemRange buffer) 
WARN_UNUSED_RESULT;
 
     /// Wait for the in-flight I/Os to complete and destroy resources 
associated with
@@ -255,19 +254,27 @@ class TmpFileMgr {
   /// handle can be passed to FileGroup::Read() to read back the data zero or 
more times.
   /// FileGroup::DestroyWriteHandle() can be called at any time to destroy the 
handle and
   /// allow reuse of the scratch file range written to. Alternatively,
-  /// FileGroup::CancelWriteAndRestoreData() can be called to reverse the 
effects of
-  /// FileGroup::Write() by destroying the handle and restoring the original 
data to the
-  /// buffer, so long as the data in the buffer was not modified by the caller.
+  /// FileGroup::RestoreData() can be called to reverse the effects of 
FileGroup::Write()
+  /// by destroying the handle and restoring the original data to the buffer, 
so long as
+  /// the data in the buffer was not modified by the caller.
   ///
   /// Public methods of WriteHandle are safe to call concurrently from 
multiple threads.
   class WriteHandle {
    public:
     /// The write must be destroyed by passing it to FileGroup - destroying it 
before
-    /// cancelling the write is an error.
-    ~WriteHandle() {
-      DCHECK(!write_in_flight_);
-      DCHECK(is_cancelled_);
-    }
+    /// the write completes is an error.
+    ~WriteHandle() { DCHECK(!write_in_flight_); }
+
+    /// Cancels the write asynchronously. After Cancel() is called, writes are 
not
+    /// retried. The write callback may be called with a CANCELLED status 
(unless
+    /// it succeeded or encountered a different error first).
+    /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't 
need it.
+    void Cancel();
+
+    /// Blocks until the write completes either successfully or unsuccessfully.
+    /// May return before the write callback has been called.
+    /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't 
need it.
+    void WaitForWrite();
 
     /// Path of temporary file backing the block. Intended for use in testing.
     /// Returns empty string if no backing file allocated.
@@ -296,13 +303,6 @@ class TmpFileMgr {
     Status RetryWrite(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* 
file,
         int64_t offset) WARN_UNUSED_RESULT;
 
-    /// Cancels the write asynchronously. After Cancel() is called, writes are 
not
-    /// retried.
-    void Cancel();
-
-    /// Blocks until the write completes either successfully or unsuccessfully.
-    void WaitForWrite();
-
     /// Called when the write has completed successfully or not. Sets 
'write_in_flight_'
     /// then calls 'cb_'.
     void WriteComplete(const Status& write_status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/testutil/cpu-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/cpu-util.h b/be/src/testutil/cpu-util.h
new file mode 100644
index 0000000..d465c52
--- /dev/null
+++ b/be/src/testutil/cpu-util.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_TESTUTIL_CPU_UTIL_H_
+#define IMPALA_TESTUTIL_CPU_UTIL_H_
+
+#include <pthread.h>
+#include <sched.h>
+
+#include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
+#include "util/cpu-info.h"
+
+namespace impala {
+
+class CpuTestUtil {
+ public:
+  /// Set the thread affinity so that it always runs on 'core'. Fail the test 
if
+  /// unsuccessful.
+  static void PinToCore(int core) {
+    cpu_set_t cpuset;
+    CPU_ZERO(&cpuset);
+    CPU_SET(core, &cpuset);
+    ASSERT_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), 
&cpuset))
+        << core;
+    ASSERT_EQ(core, CpuInfo::GetCurrentCore());
+  }
+
+  /// Reset the thread affinity of the current thread to all cores.
+  static void ResetAffinity() {
+    cpu_set_t cpuset;
+    CPU_ZERO(&cpuset);
+    for (int i = 0; i < CpuInfo::GetMaxNumCores(); ++i) {
+      CPU_SET(i, &cpuset);
+    }
+    ASSERT_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), 
&cpuset));
+  }
+
+  /// Choose a random core in [0, CpuInfo::num_cores()) and pin the current 
thread to it.
+  /// Uses 'rng' for randomness.
+  static void PinToRandomCore(std::mt19937* rng) {
+    int core = std::uniform_int_distribution<int>(0, CpuInfo::num_cores() - 
1)(*rng);
+    PinToCore(core);
+  }
+
+  /// Setup a fake NUMA setup where CpuInfo will report a NUMA configuration 
other than
+  /// the system's actual configuration. If 'has_numa' is true, sets it up as 
three NUMA
+  /// nodes with the cores distributed between them. Otherwise sets it up as a 
single
+  /// NUMA node.
+  static void SetupFakeNuma(bool has_numa) {
+    std::vector<int> core_to_node(CpuInfo::GetMaxNumCores());
+    int num_nodes = has_numa ? 3 : 1;
+    for (int i = 0; i < core_to_node.size(); ++i) core_to_node[i] = i % 
num_nodes;
+    CpuInfo::InitFakeNumaForTest(num_nodes, core_to_node);
+    LOG(INFO) << CpuInfo::DebugString();
+  }
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/testutil/rand-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/rand-util.h b/be/src/testutil/rand-util.h
new file mode 100644
index 0000000..6cdbed4
--- /dev/null
+++ b/be/src/testutil/rand-util.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_TESTUTIL_RAND_UTIL_H_
+#define IMPALA_TESTUTIL_RAND_UTIL_H_
+
+#include <cstdint>
+#include <cstdlib>
+#include <random>
+
+#include "common/logging.h"
+
+namespace impala {
+
+/// Test helpers for randomised tests.
+class RandTestUtil {
+ public:
+  /// Seed 'rng' with a seed either from the environment variable 'env_var' or 
the
+  /// current time.
+  static void SeedRng(const char* env_var, std::mt19937* rng) {
+    const char* seed_str = getenv(env_var);
+    int64_t seed;
+    if (seed_str != nullptr) {
+      seed = atoi(seed_str);
+    } else {
+      seed = time(nullptr);
+    }
+    LOG(INFO) << "Random seed (overridable with " << env_var << "): " << seed;
+    rng->seed(seed);
+  }
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.cc b/be/src/util/cpu-info.cc
index e5d2a83..a32571e 100644
--- a/be/src/util/cpu-info.cc
+++ b/be/src/util/cpu-info.cc
@@ -77,6 +77,8 @@ int CpuInfo::max_num_cores_;
 string CpuInfo::model_name_ = "unknown";
 int CpuInfo::max_num_numa_nodes_;
 unique_ptr<int[]> CpuInfo::core_to_numa_node_;
+vector<vector<int>> CpuInfo::numa_node_to_cores_;
+vector<int> CpuInfo::numa_node_core_idx_;
 
 static struct {
   string name;
@@ -182,6 +184,7 @@ void CpuInfo::InitNuma() {
     // Assume a single NUMA node.
     max_num_numa_nodes_ = 1;
     std::fill_n(core_to_numa_node_.get(), max_num_cores_, 0);
+    InitNumaNodeToCores();
     return;
   }
 
@@ -215,6 +218,29 @@ void CpuInfo::InitNuma() {
       core_to_numa_node_[core] = 0;
     }
   }
+  InitNumaNodeToCores();
+}
+
+void CpuInfo::InitFakeNumaForTest(
+    int max_num_numa_nodes, const vector<int>& core_to_numa_node) {
+  DCHECK_EQ(max_num_cores_, core_to_numa_node.size());
+  max_num_numa_nodes_ = max_num_numa_nodes;
+  for (int i = 0; i < max_num_cores_; ++i) {
+    core_to_numa_node_[i] = core_to_numa_node[i];
+  }
+  numa_node_to_cores_.clear();
+  InitNumaNodeToCores();
+}
+
+void CpuInfo::InitNumaNodeToCores() {
+  DCHECK(numa_node_to_cores_.empty());
+  numa_node_to_cores_.resize(max_num_numa_nodes_);
+  numa_node_core_idx_.resize(max_num_cores_);
+  for (int core = 0; core < max_num_cores_; ++core) {
+    vector<int>* cores_of_node = 
&numa_node_to_cores_[core_to_numa_node_[core]];
+    numa_node_core_idx_[core] = cores_of_node->size();
+    cores_of_node->push_back(core);
+  }
 }
 
 void CpuInfo::VerifyCpuRequirements() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/util/cpu-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.h b/be/src/util/cpu-info.h
index 28af3e5..38d6782 100644
--- a/be/src/util/cpu-info.h
+++ b/be/src/util/cpu-info.h
@@ -21,6 +21,7 @@
 
 #include <memory>
 #include <string>
+#include <vector>
 #include <boost/cstdint.hpp>
 
 #include "common/logging.h"
@@ -109,10 +110,35 @@ class CpuInfo {
   /// [0, GetMaxNumCores()).
   static int GetNumaNodeOfCore(int core) {
     DCHECK_LE(0, core);
-    DCHECK_LT(core, max_num_numa_nodes_);
+    DCHECK_LT(core, max_num_cores_);
     return core_to_numa_node_[core];
   }
 
+  /// Returns the cores in a NUMA node. 'node' must be in the range
+  /// [0, GetMaxNumNumaNodes()).
+  static const std::vector<int>& GetCoresOfNumaNode(int node) {
+    DCHECK_LE(0, node);
+    DCHECK_LT(node, max_num_numa_nodes_);
+    return numa_node_to_cores_[node];
+  }
+
+  /// Returns the cores in the same NUMA node as 'core'. 'core' must be in the 
range
+  /// [0, GetMaxNumCores()).
+  static const std::vector<int>& GetCoresOfSameNumaNode(int core) {
+    DCHECK_LE(0, core);
+    DCHECK_LT(core, max_num_cores_);
+    return GetCoresOfNumaNode(GetNumaNodeOfCore(core));
+  }
+
+  /// Returns the index of the given core within the vector returned by
+  /// GetCoresOfNumaNode() and GetCoresOfSameNumaNode(). 'core' must be in the 
range
+  /// [0, GetMaxNumCores()).
+  static int GetNumaNodeCoreIdx(int core) {
+    DCHECK_LE(0, core);
+    DCHECK_LT(core, max_num_cores_);
+    return numa_node_core_idx_[core];
+  }
+
   /// Returns the model name of the cpu (e.g. Intel i7-2600)
   static std::string model_name() {
     DCHECK(initialized_);
@@ -150,10 +176,23 @@ class CpuInfo {
     bool reenable_;
   };
 
+ protected:
+  friend class CpuTestUtil;
+
+  /// Setup fake NUMA info to simulate NUMA for backend tests. Sets up CpuInfo 
to
+  /// simulate 'max_num_numa_nodes' with 'core_to_numa_node' specifying the 
NUMA node
+  /// of each core in [0, GetMaxNumCores()).
+  static void InitFakeNumaForTest(
+      int max_num_numa_nodes, const std::vector<int>& core_to_numa_node);
+
  private:
   /// Initialize NUMA-related state - called from Init();
   static void InitNuma();
 
+  /// Initialize 'numa_node_to_cores_' based on 'max_num_numa_nodes_' and
+  /// 'core_to_numa_node_'. Called from InitNuma();
+  static void InitNumaNodeToCores();
+
   /// Populates the arguments with information about this machine's caches.
   /// The values returned are not reliable in some environments, e.g. RHEL5 on 
EC2, so
   /// so we will keep this as a private method.
@@ -173,7 +212,14 @@ class CpuInfo {
 
   /// Array with 'max_num_cores_' entries, each of which is the NUMA node of 
that core.
   static std::unique_ptr<int[]> core_to_numa_node_;
-};
 
+  /// Vector with 'max_num_numa_nodes_' entries, each of which is a vector of 
the cores
+  /// belonging to that NUMA node.
+  static std::vector<std::vector<int>> numa_node_to_cores_;
+
+  /// Array with 'max_num_cores_' entries, each of which is the index of that 
core in its
+  /// NUMA node.
+  static std::vector<int> numa_node_core_idx_;
+};
 }
 #endif


Reply via email to