http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index 0327425..4e62b3b 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -19,22 +19,25 @@
 #define IMPALA_RUNTIME_BUFFER_POOL_H
 
 #include <stdint.h>
+#include <string>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
-#include <string>
 
-#include "runtime/bufferpool/buffer-allocator.h"
-#include "runtime/bufferpool/buffer-pool-counters.h"
 #include "common/atomic.h"
+#include "common/compiler-util.h"
 #include "common/status.h"
 #include "gutil/macros.h"
+#include "runtime/tmp-file-mgr.h"
+#include "util/aligned-new.h"
 #include "util/internal-queue.h"
+#include "util/mem-range.h"
 #include "util/spinlock.h"
 
 namespace impala {
 
 class BufferAllocator;
 class ReservationTracker;
+class RuntimeProfile;
 
 /// A buffer pool that manages memory buffers for all queries in an Impala 
daemon.
 /// The buffer pool enforces buffer reservations, limits, and implements 
policies
@@ -51,11 +54,10 @@ class ReservationTracker;
 /// of granularity required for reporting and enforcement of reservations, 
e.g. an exec
 /// node. The client tracks buffer reservations via its ReservationTracker and 
also
 /// includes info that is helpful for debugging (e.g. the operator that is 
associated
-/// with the buffer). The client is not threadsafe, i.e. concurrent buffer pool
-/// operations should not be invoked for the same client.
+/// with the buffer). Unless otherwise noted, it is not safe to invoke 
concurrent buffer
+/// pool operations for the same client.
 ///
 /// TODO:
-/// * Implement spill-to-disk.
 /// * Decide on, document, and enforce upper limits on page size.
 ///
 /// Pages, Buffers and Pinning
@@ -122,6 +124,8 @@ class ReservationTracker;
 ///
 /// Example Usage: Spillable Pages
 /// ==============================
+/// * In order to spill pages to disk, the Client must be registered with a 
FileGroup,
+///   which is used to allocate scratch space on disk.
 /// * A spilling operator creates a new page with CreatePage().
 /// * The client reads and writes to the page's buffer as it sees fit.
 /// * If the operator encounters memory pressure, it can decrease reservation 
usage by
@@ -140,21 +144,10 @@ class ReservationTracker;
 /// structures - Client, PageHandle and BufferHandle - are not protected from 
concurrent
 /// access by the buffer pool: clients must ensure that they do not invoke 
concurrent
 /// operations with the same Client, PageHandle or BufferHandle.
-//
-/// +========================+
-/// | IMPLEMENTATION DETAILS |
-/// +========================+
-///
-/// Lock Ordering
-/// =============
-/// The lock ordering is:
-/// * pages_::lock_ -> Page::lock_
-///
-/// If a reference to a page is acquired via the pages_ list, pages_::lock_ 
must be held
-/// until done with the page to ensure the page isn't concurrently deleted.
-class BufferPool {
+class BufferPool : public CacheLineAligned {
  public:
   class BufferHandle;
+  class ClientHandle;
   class Client;
   class PageHandle;
 
@@ -170,12 +163,16 @@ class BufferPool {
   /// arguments are invalid. 'name' is an arbitrary name used to identify the 
client in
   /// any errors messages or logging. Counters for this client are added to 
the (non-NULL)
   /// 'profile'. 'client' is the client to register. 'client' should not 
already be
-  /// registered.
+  /// registered. If 'file_group' is non-NULL, it is used to allocate scratch 
space to
+  /// write unpinned pages to disk. If it is NULL, unpinning of pages is not 
allowed for
+  /// this client.
   Status RegisterClient(const std::string& name, ReservationTracker* 
reservation,
-      RuntimeProfile* profile, Client* client) WARN_UNUSED_RESULT;
+      TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile,
+      ClientHandle* client) WARN_UNUSED_RESULT;
 
-  /// Deregister 'client' if it is registered. Idempotent.
-  void DeregisterClient(Client* client);
+  /// Deregister 'client' if it is registered. All pages must be destroyed and 
buffers
+  /// must be freed for the client before calling this. Idempotent.
+  void DeregisterClient(ClientHandle* client);
 
   /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page 
length
   /// supported by BufferPool (see BufferPool class comment). The client must 
have
@@ -183,7 +180,8 @@ class BufferPool {
   /// CreatePage() only fails when a system error prevents the buffer pool 
from fulfilling
   /// the reservation.
   /// On success, the handle is mapped to the new page.
-  Status CreatePage(Client* client, int64_t len, PageHandle* handle) 
WARN_UNUSED_RESULT;
+  Status CreatePage(
+      ClientHandle* client, int64_t len, PageHandle* handle) 
WARN_UNUSED_RESULT;
 
   /// Increment the pin count of 'handle'. After Pin() the underlying page will
   /// be mapped to a buffer, which will be accessible through 'handle'. Uses
@@ -191,82 +189,98 @@ class BufferPool {
   /// unused reservation before calling Pin() (otherwise it will DCHECK). 
Pin() only
   /// fails when a system error prevents the buffer pool from fulfilling the 
reservation.
   /// 'handle' must be open.
-  Status Pin(Client* client, PageHandle* handle) WARN_UNUSED_RESULT;
+  Status Pin(ClientHandle* client, PageHandle* handle) WARN_UNUSED_RESULT;
 
   /// Decrement the pin count of 'handle'. Decrease client's reservation 
usage. If the
   /// handle's pin count becomes zero, it is no longer valid for the 
underlying page's
   /// buffer to be accessed via 'handle'. If the page's total pin count across 
all
   /// handles that reference it goes to zero, the page's data may be written 
to disk and
   /// the buffer reclaimed. 'handle' must be open and have a pin count > 0.
-  /// TODO: once we implement spilling, it will be an error to call Unpin() 
with
-  /// spilling disabled. E.g. if Impala is running without scratch (we want to 
be
-  /// able to test Unpin() before we implement actual spilling).
-  void Unpin(Client* client, PageHandle* handle);
+  ///
+  /// It is an error to reduce the pin count to 0 if 'client' does not have an 
associated
+  /// FileGroup.
+  void Unpin(ClientHandle* client, PageHandle* handle);
 
   /// Destroy the page referenced by 'handle' (if 'handle' is open). Any 
buffers or disk
   /// storage backing the page are freed. Idempotent. If the page is pinned, 
the
   /// reservation usage is decreased accordingly.
-  void DestroyPage(Client* client, PageHandle* handle);
+  void DestroyPage(ClientHandle* client, PageHandle* handle);
 
   /// Extracts buffer from a pinned page. After this returns, the page 
referenced by
   /// 'page_handle' will be destroyed and 'buffer_handle' will reference the 
buffer from
   /// 'page_handle'. This may decrease reservation usage of 'client' if the 
page was
   /// pinned multiple times via 'page_handle'.
   void ExtractBuffer(
-      Client* client, PageHandle* page_handle, BufferHandle* buffer_handle);
+      ClientHandle* client, PageHandle* page_handle, BufferHandle* 
buffer_handle);
 
   /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. 
The caller
   /// is responsible for ensuring it has enough unused reservation before 
calling
   /// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails 
when
   /// a system error prevents the buffer pool from fulfilling the reservation.
   Status AllocateBuffer(
-      Client* client, int64_t len, BufferHandle* handle) WARN_UNUSED_RESULT;
+      ClientHandle* client, int64_t len, BufferHandle* handle) 
WARN_UNUSED_RESULT;
 
   /// If 'handle' is open, close 'handle', free the buffer and and decrease the
-  /// reservation usage from 'client'. Idempotent.
-  void FreeBuffer(Client* client, BufferHandle* handle);
+  /// reservation usage from 'client'. Idempotent. Safe to call concurrently 
with
+  /// any other operations for 'client'.
+  void FreeBuffer(ClientHandle* client, BufferHandle* handle);
 
   /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move 
the
   /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' 
and
   /// decreases reservation usage in 'src_client'. 'src' must be open and 
'dst' must be
   /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be 
different.
-  /// After a successful call, 'src' is closed and 'dst' is open.
-  Status TransferBuffer(Client* src_client, BufferHandle* src, Client* 
dst_client,
-      BufferHandle* dst) WARN_UNUSED_RESULT;
+  /// After a successful call, 'src' is closed and 'dst' is open. Safe to call
+  /// concurrently with any other operations for 'src_client'.
+  Status TransferBuffer(ClientHandle* src_client, BufferHandle* src,
+      ClientHandle* dst_client, BufferHandle* dst) WARN_UNUSED_RESULT;
 
   /// Print a debug string with the state of the buffer pool.
   std::string DebugString();
 
-  int64_t min_buffer_len() const;
-  int64_t buffer_bytes_limit() const;
+  int64_t min_buffer_len() const { return min_buffer_len_; }
+  int64_t buffer_bytes_limit() const { return buffer_bytes_limit_; }
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferPool);
   struct Page;
-
-  /// Same as Unpin(), except the lock for the page referenced by 'handle' 
must be held
-  /// by the caller.
-  void UnpinLocked(Client* client, PageHandle* handle);
-
-  /// Perform the cleanup of the page object and handle when the page is 
destroyed.
-  /// Reset 'handle', free the Page object and remove the 'pages_' entry.
-  /// The 'handle->page_' lock should *not* be held by the caller.
-  void CleanUpPage(PageHandle* handle);
-
   /// Allocate a buffer of length 'len'. Assumes that the client's reservation 
has already
   /// been consumed for the buffer. Returns an error if the pool is unable to 
fulfill the
-  /// reservation.
+  /// reservation. This function may acquire 'clean_pages_lock_' and 
Page::lock so
+  /// no locks lower in the lock acquisition order (see 
buffer-pool-internal.h) should be
+  /// held by the caller.
   Status AllocateBufferInternal(
-      Client* client, int64_t len, BufferHandle* buffer) WARN_UNUSED_RESULT;
+      ClientHandle* client, int64_t len, BufferHandle* buffer) 
WARN_UNUSED_RESULT;
 
   /// Frees 'buffer', which must be open before calling. Closes 'buffer' and 
updates
   /// internal state but does not release to any reservation.
   void FreeBufferInternal(BufferHandle* buffer);
 
-  /// Check if we can allocate another buffer of size 'len' bytes without
-  /// 'buffer_bytes_remaining_' going negative.
-  /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if 
successful.
-  bool TryDecreaseBufferBytesRemaining(int64_t len);
+  /// Decrease 'buffer_bytes_remaining_' by up to 'len', down to a minimum of 
0.
+  /// Returns the amount it was decreased by.
+  int64_t DecreaseBufferBytesRemaining(int64_t max_decrease);
+
+  /// Adds a clean page 'page' to the global clean pages list, unless the page 
is in the
+  /// process of being cleaned up. Caller must hold the page's client's lock 
via
+  /// 'client_lock' so that moving the page between a client list and the 
global free
+  /// page list is atomic. Caller must not hold 'clean_pages_lock_' or any 
Page::lock.
+  void AddCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* 
page);
+
+  /// Removes a clean page 'page' from the global clean pages list, if 
present. Returns
+  /// true if it was present. Caller must hold the page's client's lock via
+  /// 'client_lock' so that moving the page between list is atomic and there 
is not a
+  /// window so that moving the page between a client list and the global free 
page list
+  /// is atomic. Caller must not hold 'clean_pages_lock_' or any Page::lock.
+  bool RemoveCleanPage(const boost::unique_lock<boost::mutex>& client_lock, 
Page* page);
+
+  /// Evict at least 'bytes_to_evict' bytes of clean pages and free the 
associated
+  /// buffers with 'allocator_'. Any bytes freed in excess of 'bytes_to_evict' 
are
+  /// added to 'buffer_bytes_remaining_.'
+  ///
+  /// Returns an error and adds any freed bytes to 'buffer_bytes_remaining_' 
if not
+  /// enough bytes could be evicted. This will only happen if there is an 
internal
+  /// bug: if all clients write out enough dirty pages to stay within their 
reservation,
+  /// then there should always be enough clean pages.
+  Status EvictCleanPages(int64_t bytes_to_evict);
 
   /// Allocator for allocating and freeing all buffer memory.
   boost::scoped_ptr<BufferAllocator> allocator_;
@@ -281,11 +295,17 @@ class BufferPool {
   /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used 
for
   /// allocating new buffers. Must be updated atomically before a new buffer is
   /// allocated or after an existing buffer is freed.
+  /// TODO: reconsider this to avoid all threads contending on this one value.
   AtomicInt64 buffer_bytes_remaining_;
 
-  /// List containing all pages. Protected by the list's internal lock.
-  typedef InternalQueue<Page> PageList;
-  PageList pages_;
+  /// Unpinned pages that have had their contents written to disk. These pages 
can be
+  /// evicted to allocate a buffer for any client. Pages are evicted in FIFO 
order,
+  /// so that pages are evicted in approximately the same order that the 
clients wrote
+  /// them to disk. 'clean_pages_lock_' protects 'clean_pages_'.
+  /// TODO: consider breaking up by page size
+  /// TODO: consider breaking up by core/NUMA node to improve locality
+  alignas(CACHE_LINE_SIZE) SpinLock clean_pages_lock_;
+  InternalList<Page> clean_pages_;
 };
 
 /// External representation of a client of the BufferPool. Clients are used for
@@ -294,11 +314,11 @@ class BufferPool {
 /// each Client instance is owned by the BufferPool's client, rather than the 
BufferPool.
 /// Each Client should only be used by a single thread at a time: concurrently 
calling
 /// Client methods or BufferPool methods with the Client as an argument is not 
supported.
-class BufferPool::Client {
+class BufferPool::ClientHandle {
  public:
-  Client() : reservation_(NULL) {}
+  ClientHandle() : reservation_(NULL) {}
   /// Client must be deregistered.
-  ~Client() { DCHECK(!is_registered()); }
+  ~ClientHandle() { DCHECK(!is_registered()); }
 
   bool is_registered() const { return reservation_ != NULL; }
   ReservationTracker* reservation() { return reservation_; }
@@ -307,21 +327,14 @@ class BufferPool::Client {
 
  private:
   friend class BufferPool;
-  DISALLOW_COPY_AND_ASSIGN(Client);
-
-  /// Initialize 'counters_' and add the counters to 'profile'.
-  void InitCounters(RuntimeProfile* profile);
-
-  /// A name identifying the client.
-  std::string name_;
+  DISALLOW_COPY_AND_ASSIGN(ClientHandle);
 
   /// The reservation tracker for the client. NULL means the client isn't 
registered.
   /// All pages pinned by the client count as usage against 'reservation_'.
   ReservationTracker* reservation_;
 
-  /// The RuntimeProfile counters for this client. All non-NULL if 
is_registered()
-  /// is true.
-  BufferPoolClientCounters counters_;
+  /// Internal state for the client. Owned by BufferPool.
+  Client* impl_;
 };
 
 /// A handle to a buffer allocated from the buffer pool. Each BufferHandle 
should only
@@ -350,6 +363,8 @@ class BufferPool::BufferHandle {
     return data_;
   }
 
+  MemRange mem_range() const { return MemRange(data(), len()); }
+
   std::string DebugString() const;
 
  private:
@@ -357,14 +372,14 @@ class BufferPool::BufferHandle {
   friend class BufferPool;
 
   /// Internal helper to set the handle to an opened state.
-  void Open(const Client* client, uint8_t* data, int64_t len);
+  void Open(const ClientHandle* client, uint8_t* data, int64_t len);
 
   /// Internal helper to reset the handle to an unopened state.
   void Reset();
 
   /// The client the buffer handle belongs to, used to validate that the 
correct client
   /// is provided in BufferPool method calls.
-  const Client* client_;
+  const ClientHandle* client_;
 
   /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
   uint8_t* data_;
@@ -393,9 +408,13 @@ class BufferPool::PageHandle {
   int pin_count() const;
   int64_t len() const;
   /// Get a pointer to the start of the page's buffer. Only valid to call if 
the page
-  /// is pinned via this handle.
+  /// is pinned.
   uint8_t* data() const { return buffer_handle()->data(); }
 
+  /// Convenience function to get the memory range for the page's buffer. Only 
valid to
+  /// call if the page is pinned.
+  MemRange mem_range() const { return buffer_handle()->mem_range(); }
+
   /// Return a pointer to the page's buffer handle. Only valid to call if the 
page is
   /// pinned via this handle. Only const accessors of the returned handle can 
be used:
   /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to 
otherwise modify
@@ -407,10 +426,11 @@ class BufferPool::PageHandle {
  private:
   DISALLOW_COPY_AND_ASSIGN(PageHandle);
   friend class BufferPool;
+  friend class BufferPoolTest;
   friend class Page;
 
   /// Internal helper to open the handle for the given page.
-  void Open(Page* page, Client* client);
+  void Open(Page* page, ClientHandle* client);
 
   /// Internal helper to reset the handle to an unopened state.
   void Reset();
@@ -420,9 +440,8 @@ class BufferPool::PageHandle {
 
   /// The client the page handle belongs to, used to validate that the correct 
client
   /// is being used.
-  const Client* client_;
+  const ClientHandle* client_;
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc 
b/be/src/runtime/bufferpool/suballocator-test.cc
index e389ba4..01f0ea6 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -47,7 +47,7 @@ class SuballocatorTest : public ::testing::Test {
   }
 
   virtual void TearDown() override {
-    for (unique_ptr<BufferPool::Client>& client : clients_) {
+    for (unique_ptr<BufferPool::ClientHandle>& client : clients_) {
       buffer_pool_->DeregisterClient(client.get());
     }
     clients_.clear();
@@ -84,11 +84,12 @@ class SuballocatorTest : public ::testing::Test {
 
   /// Register a client with 'buffer_pool_'. The client is automatically 
deregistered
   /// and freed at the end of the test.
-  void RegisterClient(ReservationTracker* reservation, BufferPool::Client** 
client) {
-    clients_.push_back(make_unique<BufferPool::Client>());
+  void RegisterClient(
+      ReservationTracker* reservation, BufferPool::ClientHandle** client) {
+    clients_.push_back(make_unique<BufferPool::ClientHandle>());
     *client = clients_.back().get();
-    ASSERT_OK(
-        buffer_pool_->RegisterClient("test client", reservation, profile(), 
*client));
+    ASSERT_OK(buffer_pool_->RegisterClient(
+        "test client", reservation, NULL, profile(), *client));
   }
 
   /// Assert that the memory for all of the suballocations is writable and 
disjoint by
@@ -121,7 +122,7 @@ class SuballocatorTest : public ::testing::Test {
   scoped_ptr<BufferPool> buffer_pool_;
 
   /// Clients for the buffer pool. Deregistered and freed after every test.
-  vector<unique_ptr<BufferPool::Client>> clients_;
+  vector<unique_ptr<BufferPool::ClientHandle>> clients_;
 
   /// Global profile - recreated for every test.
   scoped_ptr<RuntimeProfile> profile_;
@@ -137,7 +138,7 @@ const int64_t SuballocatorTest::TEST_BUFFER_LEN;
 TEST_F(SuballocatorTest, SameSizeAllocations) {
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100;
   InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
-  BufferPool::Client* client;
+  BufferPool::ClientHandle* client;
   RegisterClient(&global_reservation_, &client);
   Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
   vector<unique_ptr<Suballocation>> allocs;
@@ -174,7 +175,7 @@ TEST_F(SuballocatorTest, SameSizeAllocations) {
 TEST_F(SuballocatorTest, ZeroLengthAllocation) {
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100;
   InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
-  BufferPool::Client* client;
+  BufferPool::ClientHandle* client;
   RegisterClient(&global_reservation_, &client);
   Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
   unique_ptr<Suballocation> alloc;
@@ -191,7 +192,7 @@ TEST_F(SuballocatorTest, ZeroLengthAllocation) {
 TEST_F(SuballocatorTest, OutOfRangeAllocations) {
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100;
   InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
-  BufferPool::Client* client;
+  BufferPool::ClientHandle* client;
   RegisterClient(&global_reservation_, &client);
   Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
   unique_ptr<Suballocation> alloc;
@@ -210,7 +211,7 @@ TEST_F(SuballocatorTest, OutOfRangeAllocations) {
 TEST_F(SuballocatorTest, NonPowerOfTwoAllocations) {
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 128;
   InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
-  BufferPool::Client* client;
+  BufferPool::ClientHandle* client;
   RegisterClient(&global_reservation_, &client);
   Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
 
@@ -249,7 +250,7 @@ TEST_F(SuballocatorTest, NonPowerOfTwoAllocations) {
 TEST_F(SuballocatorTest, DoublingAllocations) {
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100;
   InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
-  BufferPool::Client* client;
+  BufferPool::ClientHandle* client;
   RegisterClient(&global_reservation_, &client);
   Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
 
@@ -306,7 +307,7 @@ TEST_F(SuballocatorTest, DoublingAllocations) {
 TEST_F(SuballocatorTest, RandomAllocations) {
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 1000;
   InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
-  BufferPool::Client* client;
+  BufferPool::ClientHandle* client;
   RegisterClient(&global_reservation_, &client);
   Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/suballocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator.cc 
b/be/src/runtime/bufferpool/suballocator.cc
index c41159e..a4835a4 100644
--- a/be/src/runtime/bufferpool/suballocator.cc
+++ b/be/src/runtime/bufferpool/suballocator.cc
@@ -34,7 +34,7 @@ constexpr int64_t Suballocator::MIN_ALLOCATION_BYTES;
 const int Suballocator::NUM_FREE_LISTS;
 
 Suballocator::Suballocator(
-    BufferPool* pool, BufferPool::Client* client, int64_t min_buffer_len)
+    BufferPool* pool, BufferPool::ClientHandle* client, int64_t min_buffer_len)
   : pool_(pool), client_(client), min_buffer_len_(min_buffer_len), 
allocated_(0) {}
 
 Suballocator::~Suballocator() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/suballocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator.h 
b/be/src/runtime/bufferpool/suballocator.h
index 6b08a8e..53f4ef5 100644
--- a/be/src/runtime/bufferpool/suballocator.h
+++ b/be/src/runtime/bufferpool/suballocator.h
@@ -67,7 +67,8 @@ class Suballocator {
   /// Constructs a suballocator that allocates memory from 'pool' with 
'client'.
   /// Suballocations smaller than 'min_buffer_len' are handled by allocating a
   /// buffer of 'min_buffer_len' and recursively splitting it.
-  Suballocator(BufferPool* pool, BufferPool::Client* client, int64_t 
min_buffer_len);
+  Suballocator(
+      BufferPool* pool, BufferPool::ClientHandle* client, int64_t 
min_buffer_len);
 
   ~Suballocator();
 
@@ -134,7 +135,7 @@ class Suballocator {
 
   /// The pool and corresponding client to allocate buffers from.
   BufferPool* pool_;
-  BufferPool::Client* client_;
+  BufferPool::ClientHandle* client_;
 
   /// The minimum length of buffer to allocate. To serve allocations below 
this threshold,
   /// a larger buffer is allocated and split into multiple allocations.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 7a93ca2..3450046 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -1219,7 +1219,6 @@ int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
 }
 
 Status DiskIoMgr::AddWriteRange(DiskIoRequestContext* writer, WriteRange* 
write_range) {
-  DCHECK_LE(write_range->len(), max_buffer_size_);
   unique_lock<mutex> writer_lock(writer->lock_);
 
   if (writer->state_ == DiskIoRequestContext::Cancelled) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 932e9c1..347db96 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -556,13 +556,16 @@ void TmpFileMgr::WriteHandle::WriteComplete(const Status& 
write_status) {
     lock_guard<mutex> lock(write_state_lock_);
     DCHECK(write_in_flight_);
     write_in_flight_ = false;
-    // Need to extract 'cb_' because once 'write_in_flight_' is false, the 
WriteHandle
-    // may be destroyed.
+    // Need to extract 'cb_' because once 'write_in_flight_' is false and we 
release
+    // 'write_state_lock_', 'this' may be destroyed.
     cb = move(cb_);
+
+    // Notify before releasing the lock - after the lock is released 'this' 
may be
+    // destroyed.
+    write_complete_cv_.NotifyAll();
   }
-  write_complete_cv_.NotifyAll();
-  // Call 'cb' once we've updated the state. We must do this last because once 
'cb' is
-  // called, it is valid to call Read() on the handle.
+  // Call 'cb' last - once 'cb' is called client code may call Read() or 
destroy this
+  // handle.
   cb(write_status);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 409c7ce..cab2d87 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -145,6 +145,8 @@ class TmpFileMgr {
 
     const TUniqueId& unique_id() const { return unique_id_; }
 
+    TmpFileMgr* tmp_file_mgr() const { return tmp_file_mgr_; }
+
    private:
     friend class File;
     friend class TmpFileMgrTest;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/testutil/death-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/death-test-util.h 
b/be/src/testutil/death-test-util.h
index 8522b61..6421fb7 100644
--- a/be/src/testutil/death-test-util.h
+++ b/be/src/testutil/death-test-util.h
@@ -28,7 +28,7 @@
 #define IMPALA_ASSERT_DEBUG_DEATH(fn, msg)    \
   do {                                        \
     ScopedCoredumpDisabler disable_coredumps; \
-    ASSERT_DEBUG_DEATH(fn, msg);              \
+    ASSERT_DEBUG_DEATH((void)fn, msg);              \
   } while (false);
 #else
 // Gtest's ASSERT_DEBUG_DEATH macro has peculiar semantics where in debug 
builds it

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/util/aligned-new.h
----------------------------------------------------------------------
diff --git a/be/src/util/aligned-new.h b/be/src/util/aligned-new.h
index 3a4270c..b9197d9 100644
--- a/be/src/util/aligned-new.h
+++ b/be/src/util/aligned-new.h
@@ -20,6 +20,7 @@
 
 #include <memory>
 
+#include "common/compiler-util.h"
 #include "common/logging.h"
 
 namespace impala {
@@ -49,7 +50,7 @@ struct alignas(ALIGNMENT) AlignedNew {
   }
 };
 
-using CacheLineAligned = AlignedNew<64>;
+using CacheLineAligned = AlignedNew<CACHE_LINE_SIZE>;
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/util/fake-lock.h
----------------------------------------------------------------------
diff --git a/be/src/util/fake-lock.h b/be/src/util/fake-lock.h
new file mode 100644
index 0000000..22e8272
--- /dev/null
+++ b/be/src/util/fake-lock.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_FAKE_LOCK_H
+#define IMPALA_UTIL_FAKE_LOCK_H
+
+namespace impala {
+
+// Implementation of Boost's lockable interface that does nothing. Used to 
replace an
+// actual lock implementation in template classes in if no thread safety is 
needed.
+class FakeLock {
+ public:
+  FakeLock() {}
+  void lock() {}
+  void unlock() {}
+  bool try_lock() { return true; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(FakeLock);
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/util/internal-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h
index 37a9a0c..5e32116 100644
--- a/be/src/util/internal-queue.h
+++ b/be/src/util/internal-queue.h
@@ -22,55 +22,64 @@
 #include <boost/function.hpp>
 #include <boost/thread/locks.hpp>
 
+#include "util/fake-lock.h"
 #include "util/spinlock.h"
 
 namespace impala {
 
-/// Thread safe fifo-queue. This is an internal queue, meaning the links to 
nodes
-/// are maintained in the object itself. This is in contrast to the stl list 
which
-/// allocates a wrapper Node object around the data. Since it's an internal 
queue,
-/// the list pointers are maintained in the Nodes which is memory owned by the 
user.
-/// The nodes cannot be deallocated while the queue has elements.
-/// To use: subclass InternalQueue::Node.
+/// FIFO queue implemented as a doubly-linked lists with internal pointers. 
This is in
+/// contrast to the STL list which allocates a wrapper Node object around the 
data. Since
+/// it's an internal queue, the list pointers are maintained in the Nodes 
which is memory
+/// owned by the user. The nodes cannot be deallocated while the queue has 
elements.
 /// The internal structure is a doubly-linked list.
 ///  NULL <-- N1 <--> N2 <--> N3 --> NULL
 ///          (head)          (tail)
+///
+/// InternalQueue<T> instantiates a thread-safe queue where the queue is 
protected by an
+/// internal Spinlock. InternalList<T> instantiates a list with no thread 
safety.
+///
+/// To use these data structures, the element to be added to the queue or list 
must
+/// subclass ::Node.
+///
 /// TODO: this is an ideal candidate to be made lock free.
 
-/// T must be a subclass of InternalQueue::Node
-template<typename T>
-class InternalQueue {
+/// T must be a subclass of InternalQueueBase::Node.
+template <typename LockType, typename T>
+class InternalQueueBase {
  public:
   struct Node {
    public:
     Node() : parent_queue(NULL), next(NULL), prev(NULL) {}
     virtual ~Node() {}
 
+    /// Returns true if the node is in a queue.
+    bool in_queue() const { return parent_queue != NULL; }
+
     /// Returns the Next/Prev node or NULL if this is the end/front.
     T* Next() const {
-      boost::lock_guard<SpinLock> lock(parent_queue->lock_);
+      boost::lock_guard<LockType> lock(parent_queue->lock_);
       return reinterpret_cast<T*>(next);
     }
     T* Prev() const {
-      boost::lock_guard<SpinLock> lock(parent_queue->lock_);
+      boost::lock_guard<LockType> lock(parent_queue->lock_);
       return reinterpret_cast<T*>(prev);
     }
 
    private:
-    friend class InternalQueue;
+    friend class InternalQueueBase<LockType, T>;
 
     /// Pointer to the queue this Node is on. NULL if not on any queue.
-    InternalQueue* parent_queue;
+    InternalQueueBase<LockType, T>* parent_queue;
     Node* next;
     Node* prev;
   };
 
-  InternalQueue() : head_(NULL), tail_(NULL), size_(0) {}
+  InternalQueueBase() : head_(NULL), tail_(NULL), size_(0) {}
 
   /// Returns the element at the head of the list without dequeuing or NULL
   /// if the queue is empty. This is O(1).
   T* head() const {
-    boost::lock_guard<SpinLock> lock(lock_);
+    boost::lock_guard<LockType> lock(lock_);
     if (empty()) return NULL;
     return reinterpret_cast<T*>(head_);
   }
@@ -78,7 +87,7 @@ class InternalQueue {
   /// Returns the element at the end of the list without dequeuing or NULL
   /// if the queue is empty. This is O(1).
   T* tail() {
-    boost::lock_guard<SpinLock> lock(lock_);
+    boost::lock_guard<LockType> lock(lock_);
     if (empty()) return NULL;
     return reinterpret_cast<T*>(tail_);
   }
@@ -91,7 +100,7 @@ class InternalQueue {
     DCHECK(node->parent_queue == NULL);
     node->parent_queue = this;
     {
-      boost::lock_guard<SpinLock> lock(lock_);
+      boost::lock_guard<LockType> lock(lock_);
       if (tail_ != NULL) tail_->next = node;
       node->prev = tail_;
       tail_ = node;
@@ -105,7 +114,7 @@ class InternalQueue {
   T* Dequeue() {
     Node* result = NULL;
     {
-      boost::lock_guard<SpinLock> lock(lock_);
+      boost::lock_guard<LockType> lock(lock_);
       if (empty()) return NULL;
       --size_;
       result = head_;
@@ -127,7 +136,7 @@ class InternalQueue {
   T* PopBack() {
     Node* result = NULL;
     {
-      boost::lock_guard<SpinLock> lock(lock_);
+      boost::lock_guard<LockType> lock(lock_);
       if (empty()) return NULL;
       --size_;
       result = tail_;
@@ -151,7 +160,7 @@ class InternalQueue {
     if (node->parent_queue == NULL) return;
     DCHECK(node->parent_queue == this);
     {
-      boost::lock_guard<SpinLock> lock(lock_);
+      boost::lock_guard<LockType> lock(lock_);
       if (node->next == NULL && node->prev == NULL) {
         // Removing only node
         DCHECK(node == head_);
@@ -184,7 +193,7 @@ class InternalQueue {
 
   /// Clears all elements in the list.
   void Clear() {
-    boost::lock_guard<SpinLock> lock(lock_);
+    boost::lock_guard<LockType> lock(lock_);
     Node* cur = head_;
     while (cur != NULL) {
       Node* tmp = cur;
@@ -199,8 +208,7 @@ class InternalQueue {
   int size() const { return size_; }
   bool empty() const { return head_ == NULL; }
 
-  /// Returns if the target is on the queue. This is O(1) and intended to
-  /// be used for debugging.
+  /// Returns if the target is on the queue. This is O(1) and does not acquire 
any locks.
   bool Contains(const T* target) const {
     return target->parent_queue == this;
   }
@@ -208,7 +216,7 @@ class InternalQueue {
   /// Validates the internal structure of the list
   bool Validate() {
     int num_elements_found = 0;
-    boost::lock_guard<SpinLock> lock(lock_);
+    boost::lock_guard<LockType> lock(lock_);
     if (head_ == NULL) {
       if (tail_ != NULL) return false;
       if (size() != 0) return false;
@@ -236,7 +244,7 @@ class InternalQueue {
   // false, terminate iteration. It is invalid to call other InternalQueue 
methods
   // from 'fn'.
   void Iterate(boost::function<bool(T*)> fn) {
-    boost::lock_guard<SpinLock> lock(lock_);
+    boost::lock_guard<LockType> lock(lock_);
     for (Node* current = head_; current != NULL; current = current->next) {
       if (!fn(reinterpret_cast<T*>(current))) return;
     }
@@ -247,7 +255,7 @@ class InternalQueue {
     std::stringstream ss;
     ss << "(";
     {
-      boost::lock_guard<SpinLock> lock(lock_);
+      boost::lock_guard<LockType> lock(lock_);
       Node* curr = head_;
       while (curr != NULL) {
         ss << (void*)curr;
@@ -260,11 +268,17 @@ class InternalQueue {
 
  private:
   friend struct Node;
-  mutable SpinLock lock_;
-  Node* head_, *tail_;
+  mutable LockType lock_;
+  Node *head_, *tail_;
   int size_;
 };
 
-}
+// The default LockType is SpinLock.
+template <typename T>
+class InternalQueue : public InternalQueueBase<SpinLock, T> {};
 
+// InternalList is a non-threadsafe implementation.
+template <typename T>
+class InternalList : public InternalQueueBase<FakeLock, T> {};
+}
 #endif

Reply via email to