http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index 0327425..4e62b3b 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -19,22 +19,25 @@ #define IMPALA_RUNTIME_BUFFER_POOL_H #include <stdint.h> +#include <string> #include <boost/scoped_ptr.hpp> #include <boost/thread/locks.hpp> -#include <string> -#include "runtime/bufferpool/buffer-allocator.h" -#include "runtime/bufferpool/buffer-pool-counters.h" #include "common/atomic.h" +#include "common/compiler-util.h" #include "common/status.h" #include "gutil/macros.h" +#include "runtime/tmp-file-mgr.h" +#include "util/aligned-new.h" #include "util/internal-queue.h" +#include "util/mem-range.h" #include "util/spinlock.h" namespace impala { class BufferAllocator; class ReservationTracker; +class RuntimeProfile; /// A buffer pool that manages memory buffers for all queries in an Impala daemon. /// The buffer pool enforces buffer reservations, limits, and implements policies @@ -51,11 +54,10 @@ class ReservationTracker; /// of granularity required for reporting and enforcement of reservations, e.g. an exec /// node. The client tracks buffer reservations via its ReservationTracker and also /// includes info that is helpful for debugging (e.g. the operator that is associated -/// with the buffer). The client is not threadsafe, i.e. concurrent buffer pool -/// operations should not be invoked for the same client. +/// with the buffer). Unless otherwise noted, it is not safe to invoke concurrent buffer +/// pool operations for the same client. /// /// TODO: -/// * Implement spill-to-disk. /// * Decide on, document, and enforce upper limits on page size. /// /// Pages, Buffers and Pinning @@ -122,6 +124,8 @@ class ReservationTracker; /// /// Example Usage: Spillable Pages /// ============================== +/// * In order to spill pages to disk, the Client must be registered with a FileGroup, +/// which is used to allocate scratch space on disk. /// * A spilling operator creates a new page with CreatePage(). /// * The client reads and writes to the page's buffer as it sees fit. /// * If the operator encounters memory pressure, it can decrease reservation usage by @@ -140,21 +144,10 @@ class ReservationTracker; /// structures - Client, PageHandle and BufferHandle - are not protected from concurrent /// access by the buffer pool: clients must ensure that they do not invoke concurrent /// operations with the same Client, PageHandle or BufferHandle. -// -/// +========================+ -/// | IMPLEMENTATION DETAILS | -/// +========================+ -/// -/// Lock Ordering -/// ============= -/// The lock ordering is: -/// * pages_::lock_ -> Page::lock_ -/// -/// If a reference to a page is acquired via the pages_ list, pages_::lock_ must be held -/// until done with the page to ensure the page isn't concurrently deleted. -class BufferPool { +class BufferPool : public CacheLineAligned { public: class BufferHandle; + class ClientHandle; class Client; class PageHandle; @@ -170,12 +163,16 @@ class BufferPool { /// arguments are invalid. 'name' is an arbitrary name used to identify the client in /// any errors messages or logging. Counters for this client are added to the (non-NULL) /// 'profile'. 'client' is the client to register. 'client' should not already be - /// registered. + /// registered. If 'file_group' is non-NULL, it is used to allocate scratch space to + /// write unpinned pages to disk. If it is NULL, unpinning of pages is not allowed for + /// this client. Status RegisterClient(const std::string& name, ReservationTracker* reservation, - RuntimeProfile* profile, Client* client) WARN_UNUSED_RESULT; + TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile, + ClientHandle* client) WARN_UNUSED_RESULT; - /// Deregister 'client' if it is registered. Idempotent. - void DeregisterClient(Client* client); + /// Deregister 'client' if it is registered. All pages must be destroyed and buffers + /// must be freed for the client before calling this. Idempotent. + void DeregisterClient(ClientHandle* client); /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length /// supported by BufferPool (see BufferPool class comment). The client must have @@ -183,7 +180,8 @@ class BufferPool { /// CreatePage() only fails when a system error prevents the buffer pool from fulfilling /// the reservation. /// On success, the handle is mapped to the new page. - Status CreatePage(Client* client, int64_t len, PageHandle* handle) WARN_UNUSED_RESULT; + Status CreatePage( + ClientHandle* client, int64_t len, PageHandle* handle) WARN_UNUSED_RESULT; /// Increment the pin count of 'handle'. After Pin() the underlying page will /// be mapped to a buffer, which will be accessible through 'handle'. Uses @@ -191,82 +189,98 @@ class BufferPool { /// unused reservation before calling Pin() (otherwise it will DCHECK). Pin() only /// fails when a system error prevents the buffer pool from fulfilling the reservation. /// 'handle' must be open. - Status Pin(Client* client, PageHandle* handle) WARN_UNUSED_RESULT; + Status Pin(ClientHandle* client, PageHandle* handle) WARN_UNUSED_RESULT; /// Decrement the pin count of 'handle'. Decrease client's reservation usage. If the /// handle's pin count becomes zero, it is no longer valid for the underlying page's /// buffer to be accessed via 'handle'. If the page's total pin count across all /// handles that reference it goes to zero, the page's data may be written to disk and /// the buffer reclaimed. 'handle' must be open and have a pin count > 0. - /// TODO: once we implement spilling, it will be an error to call Unpin() with - /// spilling disabled. E.g. if Impala is running without scratch (we want to be - /// able to test Unpin() before we implement actual spilling). - void Unpin(Client* client, PageHandle* handle); + /// + /// It is an error to reduce the pin count to 0 if 'client' does not have an associated + /// FileGroup. + void Unpin(ClientHandle* client, PageHandle* handle); /// Destroy the page referenced by 'handle' (if 'handle' is open). Any buffers or disk /// storage backing the page are freed. Idempotent. If the page is pinned, the /// reservation usage is decreased accordingly. - void DestroyPage(Client* client, PageHandle* handle); + void DestroyPage(ClientHandle* client, PageHandle* handle); /// Extracts buffer from a pinned page. After this returns, the page referenced by /// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from /// 'page_handle'. This may decrease reservation usage of 'client' if the page was /// pinned multiple times via 'page_handle'. void ExtractBuffer( - Client* client, PageHandle* page_handle, BufferHandle* buffer_handle); + ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle); /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller /// is responsible for ensuring it has enough unused reservation before calling /// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails when /// a system error prevents the buffer pool from fulfilling the reservation. Status AllocateBuffer( - Client* client, int64_t len, BufferHandle* handle) WARN_UNUSED_RESULT; + ClientHandle* client, int64_t len, BufferHandle* handle) WARN_UNUSED_RESULT; /// If 'handle' is open, close 'handle', free the buffer and and decrease the - /// reservation usage from 'client'. Idempotent. - void FreeBuffer(Client* client, BufferHandle* handle); + /// reservation usage from 'client'. Idempotent. Safe to call concurrently with + /// any other operations for 'client'. + void FreeBuffer(ClientHandle* client, BufferHandle* handle); /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' and /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different. - /// After a successful call, 'src' is closed and 'dst' is open. - Status TransferBuffer(Client* src_client, BufferHandle* src, Client* dst_client, - BufferHandle* dst) WARN_UNUSED_RESULT; + /// After a successful call, 'src' is closed and 'dst' is open. Safe to call + /// concurrently with any other operations for 'src_client'. + Status TransferBuffer(ClientHandle* src_client, BufferHandle* src, + ClientHandle* dst_client, BufferHandle* dst) WARN_UNUSED_RESULT; /// Print a debug string with the state of the buffer pool. std::string DebugString(); - int64_t min_buffer_len() const; - int64_t buffer_bytes_limit() const; + int64_t min_buffer_len() const { return min_buffer_len_; } + int64_t buffer_bytes_limit() const { return buffer_bytes_limit_; } private: DISALLOW_COPY_AND_ASSIGN(BufferPool); struct Page; - - /// Same as Unpin(), except the lock for the page referenced by 'handle' must be held - /// by the caller. - void UnpinLocked(Client* client, PageHandle* handle); - - /// Perform the cleanup of the page object and handle when the page is destroyed. - /// Reset 'handle', free the Page object and remove the 'pages_' entry. - /// The 'handle->page_' lock should *not* be held by the caller. - void CleanUpPage(PageHandle* handle); - /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the - /// reservation. + /// reservation. This function may acquire 'clean_pages_lock_' and Page::lock so + /// no locks lower in the lock acquisition order (see buffer-pool-internal.h) should be + /// held by the caller. Status AllocateBufferInternal( - Client* client, int64_t len, BufferHandle* buffer) WARN_UNUSED_RESULT; + ClientHandle* client, int64_t len, BufferHandle* buffer) WARN_UNUSED_RESULT; /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates /// internal state but does not release to any reservation. void FreeBufferInternal(BufferHandle* buffer); - /// Check if we can allocate another buffer of size 'len' bytes without - /// 'buffer_bytes_remaining_' going negative. - /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if successful. - bool TryDecreaseBufferBytesRemaining(int64_t len); + /// Decrease 'buffer_bytes_remaining_' by up to 'len', down to a minimum of 0. + /// Returns the amount it was decreased by. + int64_t DecreaseBufferBytesRemaining(int64_t max_decrease); + + /// Adds a clean page 'page' to the global clean pages list, unless the page is in the + /// process of being cleaned up. Caller must hold the page's client's lock via + /// 'client_lock' so that moving the page between a client list and the global free + /// page list is atomic. Caller must not hold 'clean_pages_lock_' or any Page::lock. + void AddCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* page); + + /// Removes a clean page 'page' from the global clean pages list, if present. Returns + /// true if it was present. Caller must hold the page's client's lock via + /// 'client_lock' so that moving the page between list is atomic and there is not a + /// window so that moving the page between a client list and the global free page list + /// is atomic. Caller must not hold 'clean_pages_lock_' or any Page::lock. + bool RemoveCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* page); + + /// Evict at least 'bytes_to_evict' bytes of clean pages and free the associated + /// buffers with 'allocator_'. Any bytes freed in excess of 'bytes_to_evict' are + /// added to 'buffer_bytes_remaining_.' + /// + /// Returns an error and adds any freed bytes to 'buffer_bytes_remaining_' if not + /// enough bytes could be evicted. This will only happen if there is an internal + /// bug: if all clients write out enough dirty pages to stay within their reservation, + /// then there should always be enough clean pages. + Status EvictCleanPages(int64_t bytes_to_evict); /// Allocator for allocating and freeing all buffer memory. boost::scoped_ptr<BufferAllocator> allocator_; @@ -281,11 +295,17 @@ class BufferPool { /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for /// allocating new buffers. Must be updated atomically before a new buffer is /// allocated or after an existing buffer is freed. + /// TODO: reconsider this to avoid all threads contending on this one value. AtomicInt64 buffer_bytes_remaining_; - /// List containing all pages. Protected by the list's internal lock. - typedef InternalQueue<Page> PageList; - PageList pages_; + /// Unpinned pages that have had their contents written to disk. These pages can be + /// evicted to allocate a buffer for any client. Pages are evicted in FIFO order, + /// so that pages are evicted in approximately the same order that the clients wrote + /// them to disk. 'clean_pages_lock_' protects 'clean_pages_'. + /// TODO: consider breaking up by page size + /// TODO: consider breaking up by core/NUMA node to improve locality + alignas(CACHE_LINE_SIZE) SpinLock clean_pages_lock_; + InternalList<Page> clean_pages_; }; /// External representation of a client of the BufferPool. Clients are used for @@ -294,11 +314,11 @@ class BufferPool { /// each Client instance is owned by the BufferPool's client, rather than the BufferPool. /// Each Client should only be used by a single thread at a time: concurrently calling /// Client methods or BufferPool methods with the Client as an argument is not supported. -class BufferPool::Client { +class BufferPool::ClientHandle { public: - Client() : reservation_(NULL) {} + ClientHandle() : reservation_(NULL) {} /// Client must be deregistered. - ~Client() { DCHECK(!is_registered()); } + ~ClientHandle() { DCHECK(!is_registered()); } bool is_registered() const { return reservation_ != NULL; } ReservationTracker* reservation() { return reservation_; } @@ -307,21 +327,14 @@ class BufferPool::Client { private: friend class BufferPool; - DISALLOW_COPY_AND_ASSIGN(Client); - - /// Initialize 'counters_' and add the counters to 'profile'. - void InitCounters(RuntimeProfile* profile); - - /// A name identifying the client. - std::string name_; + DISALLOW_COPY_AND_ASSIGN(ClientHandle); /// The reservation tracker for the client. NULL means the client isn't registered. /// All pages pinned by the client count as usage against 'reservation_'. ReservationTracker* reservation_; - /// The RuntimeProfile counters for this client. All non-NULL if is_registered() - /// is true. - BufferPoolClientCounters counters_; + /// Internal state for the client. Owned by BufferPool. + Client* impl_; }; /// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only @@ -350,6 +363,8 @@ class BufferPool::BufferHandle { return data_; } + MemRange mem_range() const { return MemRange(data(), len()); } + std::string DebugString() const; private: @@ -357,14 +372,14 @@ class BufferPool::BufferHandle { friend class BufferPool; /// Internal helper to set the handle to an opened state. - void Open(const Client* client, uint8_t* data, int64_t len); + void Open(const ClientHandle* client, uint8_t* data, int64_t len); /// Internal helper to reset the handle to an unopened state. void Reset(); /// The client the buffer handle belongs to, used to validate that the correct client /// is provided in BufferPool method calls. - const Client* client_; + const ClientHandle* client_; /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed. uint8_t* data_; @@ -393,9 +408,13 @@ class BufferPool::PageHandle { int pin_count() const; int64_t len() const; /// Get a pointer to the start of the page's buffer. Only valid to call if the page - /// is pinned via this handle. + /// is pinned. uint8_t* data() const { return buffer_handle()->data(); } + /// Convenience function to get the memory range for the page's buffer. Only valid to + /// call if the page is pinned. + MemRange mem_range() const { return buffer_handle()->mem_range(); } + /// Return a pointer to the page's buffer handle. Only valid to call if the page is /// pinned via this handle. Only const accessors of the returned handle can be used: /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to otherwise modify @@ -407,10 +426,11 @@ class BufferPool::PageHandle { private: DISALLOW_COPY_AND_ASSIGN(PageHandle); friend class BufferPool; + friend class BufferPoolTest; friend class Page; /// Internal helper to open the handle for the given page. - void Open(Page* page, Client* client); + void Open(Page* page, ClientHandle* client); /// Internal helper to reset the handle to an unopened state. void Reset(); @@ -420,9 +440,8 @@ class BufferPool::PageHandle { /// The client the page handle belongs to, used to validate that the correct client /// is being used. - const Client* client_; + const ClientHandle* client_; }; - } #endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/suballocator-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc index e389ba4..01f0ea6 100644 --- a/be/src/runtime/bufferpool/suballocator-test.cc +++ b/be/src/runtime/bufferpool/suballocator-test.cc @@ -47,7 +47,7 @@ class SuballocatorTest : public ::testing::Test { } virtual void TearDown() override { - for (unique_ptr<BufferPool::Client>& client : clients_) { + for (unique_ptr<BufferPool::ClientHandle>& client : clients_) { buffer_pool_->DeregisterClient(client.get()); } clients_.clear(); @@ -84,11 +84,12 @@ class SuballocatorTest : public ::testing::Test { /// Register a client with 'buffer_pool_'. The client is automatically deregistered /// and freed at the end of the test. - void RegisterClient(ReservationTracker* reservation, BufferPool::Client** client) { - clients_.push_back(make_unique<BufferPool::Client>()); + void RegisterClient( + ReservationTracker* reservation, BufferPool::ClientHandle** client) { + clients_.push_back(make_unique<BufferPool::ClientHandle>()); *client = clients_.back().get(); - ASSERT_OK( - buffer_pool_->RegisterClient("test client", reservation, profile(), *client)); + ASSERT_OK(buffer_pool_->RegisterClient( + "test client", reservation, NULL, profile(), *client)); } /// Assert that the memory for all of the suballocations is writable and disjoint by @@ -121,7 +122,7 @@ class SuballocatorTest : public ::testing::Test { scoped_ptr<BufferPool> buffer_pool_; /// Clients for the buffer pool. Deregistered and freed after every test. - vector<unique_ptr<BufferPool::Client>> clients_; + vector<unique_ptr<BufferPool::ClientHandle>> clients_; /// Global profile - recreated for every test. scoped_ptr<RuntimeProfile> profile_; @@ -137,7 +138,7 @@ const int64_t SuballocatorTest::TEST_BUFFER_LEN; TEST_F(SuballocatorTest, SameSizeAllocations) { const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100; InitPool(TEST_BUFFER_LEN, TOTAL_MEM); - BufferPool::Client* client; + BufferPool::ClientHandle* client; RegisterClient(&global_reservation_, &client); Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN); vector<unique_ptr<Suballocation>> allocs; @@ -174,7 +175,7 @@ TEST_F(SuballocatorTest, SameSizeAllocations) { TEST_F(SuballocatorTest, ZeroLengthAllocation) { const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100; InitPool(TEST_BUFFER_LEN, TOTAL_MEM); - BufferPool::Client* client; + BufferPool::ClientHandle* client; RegisterClient(&global_reservation_, &client); Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN); unique_ptr<Suballocation> alloc; @@ -191,7 +192,7 @@ TEST_F(SuballocatorTest, ZeroLengthAllocation) { TEST_F(SuballocatorTest, OutOfRangeAllocations) { const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100; InitPool(TEST_BUFFER_LEN, TOTAL_MEM); - BufferPool::Client* client; + BufferPool::ClientHandle* client; RegisterClient(&global_reservation_, &client); Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN); unique_ptr<Suballocation> alloc; @@ -210,7 +211,7 @@ TEST_F(SuballocatorTest, OutOfRangeAllocations) { TEST_F(SuballocatorTest, NonPowerOfTwoAllocations) { const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 128; InitPool(TEST_BUFFER_LEN, TOTAL_MEM); - BufferPool::Client* client; + BufferPool::ClientHandle* client; RegisterClient(&global_reservation_, &client); Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN); @@ -249,7 +250,7 @@ TEST_F(SuballocatorTest, NonPowerOfTwoAllocations) { TEST_F(SuballocatorTest, DoublingAllocations) { const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100; InitPool(TEST_BUFFER_LEN, TOTAL_MEM); - BufferPool::Client* client; + BufferPool::ClientHandle* client; RegisterClient(&global_reservation_, &client); Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN); @@ -306,7 +307,7 @@ TEST_F(SuballocatorTest, DoublingAllocations) { TEST_F(SuballocatorTest, RandomAllocations) { const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 1000; InitPool(TEST_BUFFER_LEN, TOTAL_MEM); - BufferPool::Client* client; + BufferPool::ClientHandle* client; RegisterClient(&global_reservation_, &client); Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/suballocator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/suballocator.cc b/be/src/runtime/bufferpool/suballocator.cc index c41159e..a4835a4 100644 --- a/be/src/runtime/bufferpool/suballocator.cc +++ b/be/src/runtime/bufferpool/suballocator.cc @@ -34,7 +34,7 @@ constexpr int64_t Suballocator::MIN_ALLOCATION_BYTES; const int Suballocator::NUM_FREE_LISTS; Suballocator::Suballocator( - BufferPool* pool, BufferPool::Client* client, int64_t min_buffer_len) + BufferPool* pool, BufferPool::ClientHandle* client, int64_t min_buffer_len) : pool_(pool), client_(client), min_buffer_len_(min_buffer_len), allocated_(0) {} Suballocator::~Suballocator() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/suballocator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/suballocator.h b/be/src/runtime/bufferpool/suballocator.h index 6b08a8e..53f4ef5 100644 --- a/be/src/runtime/bufferpool/suballocator.h +++ b/be/src/runtime/bufferpool/suballocator.h @@ -67,7 +67,8 @@ class Suballocator { /// Constructs a suballocator that allocates memory from 'pool' with 'client'. /// Suballocations smaller than 'min_buffer_len' are handled by allocating a /// buffer of 'min_buffer_len' and recursively splitting it. - Suballocator(BufferPool* pool, BufferPool::Client* client, int64_t min_buffer_len); + Suballocator( + BufferPool* pool, BufferPool::ClientHandle* client, int64_t min_buffer_len); ~Suballocator(); @@ -134,7 +135,7 @@ class Suballocator { /// The pool and corresponding client to allocate buffers from. BufferPool* pool_; - BufferPool::Client* client_; + BufferPool::ClientHandle* client_; /// The minimum length of buffer to allocate. To serve allocations below this threshold, /// a larger buffer is allocated and split into multiple allocations. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 7a93ca2..3450046 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -1219,7 +1219,6 @@ int DiskIoMgr::free_buffers_idx(int64_t buffer_size) { } Status DiskIoMgr::AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range) { - DCHECK_LE(write_range->len(), max_buffer_size_); unique_lock<mutex> writer_lock(writer->lock_); if (writer->state_ == DiskIoRequestContext::Cancelled) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 932e9c1..347db96 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -556,13 +556,16 @@ void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) { lock_guard<mutex> lock(write_state_lock_); DCHECK(write_in_flight_); write_in_flight_ = false; - // Need to extract 'cb_' because once 'write_in_flight_' is false, the WriteHandle - // may be destroyed. + // Need to extract 'cb_' because once 'write_in_flight_' is false and we release + // 'write_state_lock_', 'this' may be destroyed. cb = move(cb_); + + // Notify before releasing the lock - after the lock is released 'this' may be + // destroyed. + write_complete_cv_.NotifyAll(); } - write_complete_cv_.NotifyAll(); - // Call 'cb' once we've updated the state. We must do this last because once 'cb' is - // called, it is valid to call Read() on the handle. + // Call 'cb' last - once 'cb' is called client code may call Read() or destroy this + // handle. cb(write_status); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index 409c7ce..cab2d87 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -145,6 +145,8 @@ class TmpFileMgr { const TUniqueId& unique_id() const { return unique_id_; } + TmpFileMgr* tmp_file_mgr() const { return tmp_file_mgr_; } + private: friend class File; friend class TmpFileMgrTest; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/testutil/death-test-util.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/death-test-util.h b/be/src/testutil/death-test-util.h index 8522b61..6421fb7 100644 --- a/be/src/testutil/death-test-util.h +++ b/be/src/testutil/death-test-util.h @@ -28,7 +28,7 @@ #define IMPALA_ASSERT_DEBUG_DEATH(fn, msg) \ do { \ ScopedCoredumpDisabler disable_coredumps; \ - ASSERT_DEBUG_DEATH(fn, msg); \ + ASSERT_DEBUG_DEATH((void)fn, msg); \ } while (false); #else // Gtest's ASSERT_DEBUG_DEATH macro has peculiar semantics where in debug builds it http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/util/aligned-new.h ---------------------------------------------------------------------- diff --git a/be/src/util/aligned-new.h b/be/src/util/aligned-new.h index 3a4270c..b9197d9 100644 --- a/be/src/util/aligned-new.h +++ b/be/src/util/aligned-new.h @@ -20,6 +20,7 @@ #include <memory> +#include "common/compiler-util.h" #include "common/logging.h" namespace impala { @@ -49,7 +50,7 @@ struct alignas(ALIGNMENT) AlignedNew { } }; -using CacheLineAligned = AlignedNew<64>; +using CacheLineAligned = AlignedNew<CACHE_LINE_SIZE>; } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/util/fake-lock.h ---------------------------------------------------------------------- diff --git a/be/src/util/fake-lock.h b/be/src/util/fake-lock.h new file mode 100644 index 0000000..22e8272 --- /dev/null +++ b/be/src/util/fake-lock.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_UTIL_FAKE_LOCK_H +#define IMPALA_UTIL_FAKE_LOCK_H + +namespace impala { + +// Implementation of Boost's lockable interface that does nothing. Used to replace an +// actual lock implementation in template classes in if no thread safety is needed. +class FakeLock { + public: + FakeLock() {} + void lock() {} + void unlock() {} + bool try_lock() { return true; } + + private: + DISALLOW_COPY_AND_ASSIGN(FakeLock); +}; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/util/internal-queue.h ---------------------------------------------------------------------- diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h index 37a9a0c..5e32116 100644 --- a/be/src/util/internal-queue.h +++ b/be/src/util/internal-queue.h @@ -22,55 +22,64 @@ #include <boost/function.hpp> #include <boost/thread/locks.hpp> +#include "util/fake-lock.h" #include "util/spinlock.h" namespace impala { -/// Thread safe fifo-queue. This is an internal queue, meaning the links to nodes -/// are maintained in the object itself. This is in contrast to the stl list which -/// allocates a wrapper Node object around the data. Since it's an internal queue, -/// the list pointers are maintained in the Nodes which is memory owned by the user. -/// The nodes cannot be deallocated while the queue has elements. -/// To use: subclass InternalQueue::Node. +/// FIFO queue implemented as a doubly-linked lists with internal pointers. This is in +/// contrast to the STL list which allocates a wrapper Node object around the data. Since +/// it's an internal queue, the list pointers are maintained in the Nodes which is memory +/// owned by the user. The nodes cannot be deallocated while the queue has elements. /// The internal structure is a doubly-linked list. /// NULL <-- N1 <--> N2 <--> N3 --> NULL /// (head) (tail) +/// +/// InternalQueue<T> instantiates a thread-safe queue where the queue is protected by an +/// internal Spinlock. InternalList<T> instantiates a list with no thread safety. +/// +/// To use these data structures, the element to be added to the queue or list must +/// subclass ::Node. +/// /// TODO: this is an ideal candidate to be made lock free. -/// T must be a subclass of InternalQueue::Node -template<typename T> -class InternalQueue { +/// T must be a subclass of InternalQueueBase::Node. +template <typename LockType, typename T> +class InternalQueueBase { public: struct Node { public: Node() : parent_queue(NULL), next(NULL), prev(NULL) {} virtual ~Node() {} + /// Returns true if the node is in a queue. + bool in_queue() const { return parent_queue != NULL; } + /// Returns the Next/Prev node or NULL if this is the end/front. T* Next() const { - boost::lock_guard<SpinLock> lock(parent_queue->lock_); + boost::lock_guard<LockType> lock(parent_queue->lock_); return reinterpret_cast<T*>(next); } T* Prev() const { - boost::lock_guard<SpinLock> lock(parent_queue->lock_); + boost::lock_guard<LockType> lock(parent_queue->lock_); return reinterpret_cast<T*>(prev); } private: - friend class InternalQueue; + friend class InternalQueueBase<LockType, T>; /// Pointer to the queue this Node is on. NULL if not on any queue. - InternalQueue* parent_queue; + InternalQueueBase<LockType, T>* parent_queue; Node* next; Node* prev; }; - InternalQueue() : head_(NULL), tail_(NULL), size_(0) {} + InternalQueueBase() : head_(NULL), tail_(NULL), size_(0) {} /// Returns the element at the head of the list without dequeuing or NULL /// if the queue is empty. This is O(1). T* head() const { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); if (empty()) return NULL; return reinterpret_cast<T*>(head_); } @@ -78,7 +87,7 @@ class InternalQueue { /// Returns the element at the end of the list without dequeuing or NULL /// if the queue is empty. This is O(1). T* tail() { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); if (empty()) return NULL; return reinterpret_cast<T*>(tail_); } @@ -91,7 +100,7 @@ class InternalQueue { DCHECK(node->parent_queue == NULL); node->parent_queue = this; { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); if (tail_ != NULL) tail_->next = node; node->prev = tail_; tail_ = node; @@ -105,7 +114,7 @@ class InternalQueue { T* Dequeue() { Node* result = NULL; { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); if (empty()) return NULL; --size_; result = head_; @@ -127,7 +136,7 @@ class InternalQueue { T* PopBack() { Node* result = NULL; { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); if (empty()) return NULL; --size_; result = tail_; @@ -151,7 +160,7 @@ class InternalQueue { if (node->parent_queue == NULL) return; DCHECK(node->parent_queue == this); { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); if (node->next == NULL && node->prev == NULL) { // Removing only node DCHECK(node == head_); @@ -184,7 +193,7 @@ class InternalQueue { /// Clears all elements in the list. void Clear() { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); Node* cur = head_; while (cur != NULL) { Node* tmp = cur; @@ -199,8 +208,7 @@ class InternalQueue { int size() const { return size_; } bool empty() const { return head_ == NULL; } - /// Returns if the target is on the queue. This is O(1) and intended to - /// be used for debugging. + /// Returns if the target is on the queue. This is O(1) and does not acquire any locks. bool Contains(const T* target) const { return target->parent_queue == this; } @@ -208,7 +216,7 @@ class InternalQueue { /// Validates the internal structure of the list bool Validate() { int num_elements_found = 0; - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); if (head_ == NULL) { if (tail_ != NULL) return false; if (size() != 0) return false; @@ -236,7 +244,7 @@ class InternalQueue { // false, terminate iteration. It is invalid to call other InternalQueue methods // from 'fn'. void Iterate(boost::function<bool(T*)> fn) { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); for (Node* current = head_; current != NULL; current = current->next) { if (!fn(reinterpret_cast<T*>(current))) return; } @@ -247,7 +255,7 @@ class InternalQueue { std::stringstream ss; ss << "("; { - boost::lock_guard<SpinLock> lock(lock_); + boost::lock_guard<LockType> lock(lock_); Node* curr = head_; while (curr != NULL) { ss << (void*)curr; @@ -260,11 +268,17 @@ class InternalQueue { private: friend struct Node; - mutable SpinLock lock_; - Node* head_, *tail_; + mutable LockType lock_; + Node *head_, *tail_; int size_; }; -} +// The default LockType is SpinLock. +template <typename T> +class InternalQueue : public InternalQueueBase<SpinLock, T> {}; +// InternalList is a non-threadsafe implementation. +template <typename T> +class InternalList : public InternalQueueBase<FakeLock, T> {}; +} #endif
