Repository: incubator-impala Updated Branches: refs/heads/master 8bdfe0320 -> 42002b91c
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index da8ef3c..7770fff 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -20,11 +20,13 @@ #include <stdint.h> #include <string> +#include <vector> #include <boost/scoped_ptr.hpp> #include <boost/thread/locks.hpp> #include "common/atomic.h" #include "common/compiler-util.h" +#include "common/object-pool.h" #include "common/status.h" #include "gutil/macros.h" #include "runtime/tmp-file-mgr.h" @@ -35,9 +37,9 @@ namespace impala { -class BufferAllocator; class ReservationTracker; class RuntimeProfile; +class SystemAllocator; /// A buffer pool that manages memory buffers for all queries in an Impala daemon. /// The buffer pool enforces buffer reservations, limits, and implements policies @@ -57,9 +59,6 @@ class RuntimeProfile; /// with the buffer). Unless otherwise noted, it is not safe to invoke concurrent buffer /// pool operations for the same client. /// -/// TODO: -/// * Decide on, document, and enforce upper limits on page size. -/// /// Pages, Buffers and Pinning /// ========================== /// * A page is a logical block of memory that can reside in memory or on disk. @@ -146,9 +145,9 @@ class RuntimeProfile; /// operations with the same Client, PageHandle or BufferHandle. class BufferPool : public CacheLineAligned { public: + class BufferAllocator; class BufferHandle; class ClientHandle; - class Client; class PageHandle; /// Constructs a new buffer pool. @@ -240,79 +239,44 @@ class BufferPool : public CacheLineAligned { Status TransferBuffer(ClientHandle* src_client, BufferHandle* src, ClientHandle* dst_client, BufferHandle* dst) WARN_UNUSED_RESULT; + /// Try to release at least 'bytes_to_free' bytes of memory to the system allocator. + /// TODO: once IMPALA-4834 is done and all large allocations are served from the buffer + /// pool, this may not be necessary. + void ReleaseMemory(int64_t bytes_to_free); + + /// Called periodically by a maintenance thread to released unneeded memory back to the + /// system allocator. + void Maintenance(); + /// Print a debug string with the state of the buffer pool. std::string DebugString(); int64_t min_buffer_len() const { return min_buffer_len_; } - int64_t buffer_bytes_limit() const { return buffer_bytes_limit_; } + + /// Generous upper bounds on page and buffer size and the number of different + /// power-of-two buffer sizes. + static constexpr int LOG_MAX_BUFFER_BYTES = 48; + static constexpr int64_t MAX_BUFFER_BYTES = 1L << LOG_MAX_BUFFER_BYTES; + + protected: + friend class BufferPoolTest; + /// Test helper: get a reference to the allocator. + BufferAllocator* allocator() { return allocator_.get(); } private: DISALLOW_COPY_AND_ASSIGN(BufferPool); + class Client; + class FreeBufferArena; class PageList; struct Page; - /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already - /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the - /// reservation. This function may acquire 'clean_pages_lock_' and Page::lock so - /// no locks lower in the lock acquisition order (see buffer-pool-internal.h) should be - /// held by the caller. - Status AllocateBufferInternal( - ClientHandle* client, int64_t len, BufferHandle* buffer) WARN_UNUSED_RESULT; - - /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates - /// internal state but does not release to any reservation. - void FreeBufferInternal(BufferHandle* buffer); - - /// Decrease 'buffer_bytes_remaining_' by up to 'len', down to a minimum of 0. - /// Returns the amount it was decreased by. - int64_t DecreaseBufferBytesRemaining(int64_t max_decrease); - - /// Adds a clean page 'page' to the global clean pages list, unless the page is in the - /// process of being cleaned up. Caller must hold the page's client's lock via - /// 'client_lock' so that moving the page between a client list and the global free - /// page list is atomic. Caller must not hold 'clean_pages_lock_' or any Page::lock. - void AddCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* page); - - /// Removes a clean page 'page' from the global clean pages list, if present. Returns - /// true if it was present. Caller must hold the page's client's lock via - /// 'client_lock' so that moving the page between list is atomic and there is not a - /// window so that moving the page between a client list and the global free page list - /// is atomic. Caller must not hold 'clean_pages_lock_' or any Page::lock. - bool RemoveCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* page); - - /// Evict at least 'bytes_to_evict' bytes of clean pages and free the associated - /// buffers with 'allocator_'. Any bytes freed in excess of 'bytes_to_evict' are - /// added to 'buffer_bytes_remaining_.' - /// - /// Returns an error and adds any freed bytes to 'buffer_bytes_remaining_' if not - /// enough bytes could be evicted. This will only happen if there is an internal - /// bug: if all clients write out enough dirty pages to stay within their reservation, - /// then there should always be enough clean pages. - Status EvictCleanPages(int64_t bytes_to_evict); - /// Allocator for allocating and freeing all buffer memory. + /// Allocator for allocating and freeing all buffer memory and managing lists of free + /// buffers and clean pages. boost::scoped_ptr<BufferAllocator> allocator_; /// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two /// multiple of this length. This is always a power of two. const int64_t min_buffer_len_; - - /// The maximum physical memory in bytes that can be used for buffers. - const int64_t buffer_bytes_limit_; - - /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for - /// allocating new buffers. Must be updated atomically before a new buffer is - /// allocated or after an existing buffer is freed. - /// TODO: reconsider this to avoid all threads contending on this one value. - AtomicInt64 buffer_bytes_remaining_; - - /// Unpinned pages that have had their contents written to disk. These pages can be - /// evicted to allocate a buffer for any client. Pages are evicted in FIFO order, - /// so that pages are evicted in approximately the same order that the clients wrote - /// them to disk. 'clean_pages_lock_' protects 'clean_pages_'. - /// TODO: consider breaking up by page size - /// TODO: consider breaking up by core/NUMA node to improve locality - alignas(CACHE_LINE_SIZE) SpinLock clean_pages_lock_; - InternalList<Page> clean_pages_; }; /// External representation of a client of the BufferPool. Clients are used for @@ -390,17 +354,17 @@ class BufferPool::BufferHandle { private: DISALLOW_COPY_AND_ASSIGN(BufferHandle); friend class BufferPool; - friend class BufferAllocator; + friend class SystemAllocator; /// Internal helper to set the handle to an opened state. - void Open(uint8_t* data, int64_t len); + void Open(uint8_t* data, int64_t len, int home_core); /// Internal helper to reset the handle to an unopened state. Inlined to make moving /// efficient. inline void Reset(); /// The client the buffer handle belongs to, used to validate that the correct client - /// is provided in BufferPool method calls. + /// is provided in BufferPool method calls. Set to NULL if the buffer is in a free list. const ClientHandle* client_; /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed. @@ -408,6 +372,10 @@ class BufferPool::BufferHandle { /// Length of the buffer in bytes. int64_t len_; + + /// The CPU core that the buffer was allocated from - used to determine which arena + /// it will be added to. + int home_core_; }; /// The handle for a page used by clients of the BufferPool. Each PageHandle should @@ -477,6 +445,7 @@ inline BufferPool::BufferHandle& BufferPool::BufferHandle::operator=( client_ = src.client_; data_ = src.data_; len_ = src.len_; + home_core_ = src.home_core_; src.Reset(); return *this; } @@ -485,6 +454,7 @@ inline void BufferPool::BufferHandle::Reset() { client_ = NULL; data_ = NULL; len_ = -1; + home_core_ = -1; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/free-list-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/free-list-test.cc b/be/src/runtime/bufferpool/free-list-test.cc index 190faa9..7cb80b4 100644 --- a/be/src/runtime/bufferpool/free-list-test.cc +++ b/be/src/runtime/bufferpool/free-list-test.cc @@ -20,7 +20,9 @@ #include "common/object-pool.h" #include "runtime/bufferpool/free-list.h" +#include "runtime/bufferpool/system-allocator.h" #include "testutil/gtest-util.h" +#include "testutil/rand-util.h" #include "common/names.h" @@ -29,8 +31,8 @@ namespace impala { class FreeListTest : public ::testing::Test { protected: virtual void SetUp() override { - allocator_ = obj_pool_.Add(new BufferAllocator(MIN_BUFFER_LEN)); - SeedRng(); + allocator_ = obj_pool_.Add(new SystemAllocator(MIN_BUFFER_LEN)); + RandTestUtil::SeedRng("FREE_LIST_TEST_SEED", &rng_); } virtual void TearDown() override { @@ -38,16 +40,8 @@ class FreeListTest : public ::testing::Test { obj_pool_.Clear(); } - /// Seed 'rng_' with a seed either for the environment or based on the current time. - void SeedRng() { - const char* seed_str = getenv("FREE_LIST_TEST_SEED"); - int64_t seed = seed_str != nullptr ? atoi(seed_str) : time(nullptr); - LOG(INFO) << "Random seed: " << seed; - rng_.seed(seed); - } - - void AllocateBuffers(int num_buffers, int64_t buffer_len, - vector<BufferHandle>* buffers) { + void AllocateBuffers( + int num_buffers, int64_t buffer_len, vector<BufferHandle>* buffers) { for (int i = 0; i < num_buffers; ++i) { BufferHandle buffer; ASSERT_OK(allocator_->Allocate(buffer_len, &buffer)); @@ -69,11 +63,10 @@ class FreeListTest : public ::testing::Test { buffers->clear(); } - void FreeBuffers(vector<BufferHandle>* buffers) { - for (BufferHandle& buffer : *buffers) { + void FreeBuffers(vector<BufferHandle>&& buffers) { + for (BufferHandle& buffer : buffers) { allocator_->Free(move(buffer)); } - buffers->clear(); } const static int MIN_BUFFER_LEN = 1024; @@ -85,7 +78,7 @@ class FreeListTest : public ::testing::Test { ObjectPool obj_pool_; /// The buffer allocator, owned by 'obj_pool_'. - BufferAllocator* allocator_; + SystemAllocator* allocator_; }; const int FreeListTest::MIN_BUFFER_LEN; @@ -115,7 +108,8 @@ TEST_F(FreeListTest, SmallList) { std::shuffle(buffers.begin(), buffers.end(), rng_); AddFreeBuffers(&small_list, &buffers); // Shrink list down to LIST_SIZE. - small_list.FreeBuffers(allocator_, max<int64_t>(0, small_list.Size() - LIST_SIZE)); + FreeBuffers( + small_list.GetBuffersToFree(max<int64_t>(0, small_list.Size() - LIST_SIZE))); // The LIST_SIZE buffers with the lowest address should be retained, and the // remaining buffers should have been freed. @@ -125,7 +119,7 @@ TEST_F(FreeListTest, SmallList) { buffers.push_back(move(buffer)); } ASSERT_FALSE(small_list.PopFreeBuffer(&buffer)); - FreeBuffers(&buffers); + FreeBuffers(move(buffers)); } } } @@ -148,7 +142,7 @@ TEST_F(FreeListTest, ReturnOrder) { AddFreeBuffers(&list, &buffers); // Free buffers. Only the buffers with the high addresses should be freed. - list.FreeBuffers(allocator_, max<int64_t>(0, list.Size() - LIST_SIZE)); + FreeBuffers(list.GetBuffersToFree(max<int64_t>(0, list.Size() - LIST_SIZE))); // Validate that the buffers with lowest addresses are returned in ascending order. BufferHandle buffer; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/free-list.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/free-list.h b/be/src/runtime/bufferpool/free-list.h index 6cec0e8..ec1057d 100644 --- a/be/src/runtime/bufferpool/free-list.h +++ b/be/src/runtime/bufferpool/free-list.h @@ -26,7 +26,6 @@ #include "common/logging.h" #include "gutil/macros.h" -#include "runtime/bufferpool/buffer-allocator.h" #include "runtime/bufferpool/buffer-pool.h" namespace impala { @@ -75,28 +74,21 @@ class FreeList { std::push_heap(free_list_.begin(), free_list_.end(), HeapCompare); } - /// Frees all the buffers in the list with 'allocator'. Returns the number of bytes - /// freed. - int64_t FreeAll(BufferAllocator* allocator) { - return FreeBuffers(allocator, free_list_.size()); - } - - /// Free 'num_buffers' buffers from the list with 'allocator'. Returns the number of - /// bytes freed. The average time complexity is n log n, where n is the current size of - /// the list. - int64_t FreeBuffers(BufferAllocator* allocator, int64_t num_buffers) { + /// Get the 'num_buffers' buffers with the highest memory address from the list to + /// free. The average time complexity is n log n, where n is the current size of the + /// list. + vector<BufferHandle> GetBuffersToFree(int64_t num_buffers) { + vector<BufferHandle> buffers; DCHECK_LE(num_buffers, free_list_.size()); // Sort the list so we can free the buffers with higher memory addresses. // Note that the sorted list is still a valid min-heap. std::sort(free_list_.begin(), free_list_.end(), SortCompare); - int64_t bytes_freed = 0; for (int64_t i = 0; i < num_buffers; ++i) { - bytes_freed += free_list_.back().len(); - allocator->Free(std::move(free_list_.back())); + buffers.emplace_back(std::move(free_list_.back())); free_list_.pop_back(); } - return bytes_freed; + return buffers; } /// Returns the number of buffers currently in the list. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/suballocator-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc index 13bc597..ea515fe 100644 --- a/be/src/runtime/bufferpool/suballocator-test.cc +++ b/be/src/runtime/bufferpool/suballocator-test.cc @@ -29,6 +29,7 @@ #include "runtime/bufferpool/suballocator.h" #include "testutil/death-test-util.h" #include "testutil/gtest-util.h" +#include "testutil/rand-util.h" #include "util/bit-util.h" #include "common/names.h" @@ -43,7 +44,7 @@ namespace impala { class SuballocatorTest : public ::testing::Test { public: virtual void SetUp() override { - SeedRng(); + RandTestUtil::SeedRng("SUBALLOCATOR_TEST_SEED", &rng_); profile_.reset(new RuntimeProfile(&obj_pool_, "test profile")); } @@ -63,19 +64,6 @@ class SuballocatorTest : public ::testing::Test { const static int64_t TEST_BUFFER_LEN = Suballocator::MIN_ALLOCATION_BYTES * 16; protected: - /// Seed 'rng_' with a seed either for the environment or based on the current time. - void SeedRng() { - const char* seed_str = getenv("SUBALLOCATOR_TEST_SEED"); - int64_t seed; - if (seed_str != nullptr) { - seed = atoi(seed_str); - } else { - seed = time(nullptr); - } - LOG(INFO) << "Random seed: " << seed; - rng_.seed(seed); - } - /// Initialize 'buffer_pool_' and 'global_reservation_' with a limit of 'total_mem' /// bytes of buffers of minimum length 'min_buffer_len'. void InitPool(int64_t min_buffer_len, int total_mem) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/suballocator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/suballocator.h b/be/src/runtime/bufferpool/suballocator.h index 53f4ef5..e6834ad 100644 --- a/be/src/runtime/bufferpool/suballocator.h +++ b/be/src/runtime/bufferpool/suballocator.h @@ -89,10 +89,10 @@ class Suballocator { /// failed Allocate() call). void Free(std::unique_ptr<Suballocation> allocation); - /// Generous upper bounds on the max allocation size and the number of different + /// Upper bounds on the max allocation size and the number of different /// power-of-two allocation sizes. Used to bound the number of free lists. - static constexpr int LOG_MAX_ALLOCATION_BYTES = 48; - static constexpr int64_t MAX_ALLOCATION_BYTES = 1L << LOG_MAX_ALLOCATION_BYTES; + static constexpr int LOG_MAX_ALLOCATION_BYTES = BufferPool::LOG_MAX_BUFFER_BYTES; + static constexpr int64_t MAX_ALLOCATION_BYTES = BufferPool::MAX_BUFFER_BYTES; /// Don't support allocations less than 4kb to avoid high overhead. static constexpr int LOG_MIN_ALLOCATION_BYTES = 12; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/system-allocator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/system-allocator.cc b/be/src/runtime/bufferpool/system-allocator.cc new file mode 100644 index 0000000..0ea429a --- /dev/null +++ b/be/src/runtime/bufferpool/system-allocator.cc @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/bufferpool/system-allocator.h" + +#include "util/bit-util.h" + +namespace impala { + +SystemAllocator::SystemAllocator(int64_t min_buffer_len) + : min_buffer_len_(min_buffer_len) { + DCHECK(BitUtil::IsPowerOf2(min_buffer_len)); +} + +Status SystemAllocator::Allocate(int64_t len, BufferPool::BufferHandle* buffer) { + DCHECK_GE(len, min_buffer_len_); + DCHECK_LE(len, BufferPool::MAX_BUFFER_BYTES); + DCHECK(BitUtil::IsPowerOf2(len)) << len; + + uint8_t* alloc = reinterpret_cast<uint8_t*>(malloc(len)); + if (alloc == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len); + buffer->Open(alloc, len, CpuInfo::GetCurrentCore()); + return Status::OK(); +} + +void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) { + free(buffer.data()); + buffer.Reset(); // Avoid DCHECK in ~BufferHandle(). +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/system-allocator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/system-allocator.h b/be/src/runtime/bufferpool/system-allocator.h new file mode 100644 index 0000000..8b4a544 --- /dev/null +++ b/be/src/runtime/bufferpool/system-allocator.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_SYSTEM_ALLOCATOR_H +#define IMPALA_RUNTIME_SYSTEM_ALLOCATOR_H + +#include "common/status.h" + +#include "runtime/bufferpool/buffer-pool.h" + +namespace impala { + +/// The underlying memory allocator for the buffer pool that allocates buffer memory from +/// the system. All buffers are allocated through the BufferPool's SystemAllocator. The +/// allocator only handles allocating buffers that are power-of-two multiples of the +/// minimum buffer length. +/// +/// TODO: +/// * Allocate memory with mmap() instead of malloc(). +class SystemAllocator { + public: + SystemAllocator(int64_t min_buffer_len); + + /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple + /// of the minimum buffer length. + Status Allocate(int64_t len, BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT; + + /// Free the memory for a previously-allocated buffer. + void Free(BufferPool::BufferHandle&& buffer); + + private: + const int64_t min_buffer_len_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 5d9ced1..b973a84 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -380,9 +380,6 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { DCHECK(process_mem_tracker != NULL); free_buffer_mem_tracker_.reset( new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false)); - // If we hit the process limit, see if we can reclaim some memory by removing - // previously allocated (but unused) io buffers. - process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this)); for (int i = 0; i < disk_queues_.size(); ++i) { disk_queues_[i] = new DiskQueue(i); @@ -759,12 +756,17 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read return GetBufferDesc(reader, reader->mem_tracker_, range, buffer, buffer_size); } -void DiskIoMgr::GcIoBuffers() { +void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) { unique_lock<mutex> lock(free_buffers_lock_); int buffers_freed = 0; int bytes_freed = 0; + // Free small-to-large to avoid retaining many small buffers and fragmenting memory. for (int idx = 0; idx < free_buffers_.size(); ++idx) { - for (uint8_t* buffer : free_buffers_[idx]) { + std::list<uint8_t*>* free_buffers = &free_buffers_[idx]; + while ( + !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= bytes_to_free)) { + uint8_t* buffer = free_buffers->front(); + free_buffers->pop_front(); int64_t buffer_size = (1LL << idx) * min_buffer_size_; delete[] buffer; free_buffer_mem_tracker_->Release(buffer_size); @@ -773,7 +775,7 @@ void DiskIoMgr::GcIoBuffers() { ++buffers_freed; bytes_freed += buffer_size; } - free_buffers_[idx].clear(); + if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break; } if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) { @@ -783,7 +785,7 @@ void DiskIoMgr::GcIoBuffers() { ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed); } if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) { - ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->set_value(0); + ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed); } } @@ -1125,7 +1127,7 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange( DCHECK(reader->mem_tracker_ != NULL); bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY; if (!enough_memory) { - // Low memory, GC and try again. + // Low memory, GC all the buffers and try again. GcIoBuffers(); enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index b7222cb..b70c9c8 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -774,6 +774,10 @@ class DiskIoMgr { /// later reuse. void CacheOrCloseFileHandle(const char* fname, HdfsCachedFileHandle* fid, bool close); + /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if + /// 'bytes_to_free' is -1. + void GcIoBuffers(int64_t bytes_to_free = -1); + /// Default ready buffer queue capacity. This constant doesn't matter too much /// since the system dynamically adjusts. static const int DEFAULT_QUEUE_CAPACITY; @@ -902,12 +906,6 @@ class DiskIoMgr { /// Returns a buffer desc object which can now be used for another reader. void ReturnBufferDesc(BufferDescriptor* desc); - /// Garbage collect all unused io buffers. This is currently only triggered when the - /// process wide limit is hit. This is not good enough. While it is sufficient for - /// the IoMgr, other components do not trigger this GC. - /// TODO: make this run periodically? - void GcIoBuffers(); - /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be NULL), either /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index dd27a91..1fa863e 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -313,12 +313,6 @@ Status ExecEnv::StartServices() { // Limit of -1 means no memory limit. mem_tracker_.reset(new MemTracker(TcmallocMetric::PHYSICAL_BYTES_RESERVED, bytes_limit > 0 ? bytes_limit : -1, "Process")); - - // Since tcmalloc does not free unused memory, we may exceed the process mem limit even - // if Impala is not actually using that much memory. Add a callback to free any unused - // memory if we hit the process limit. - mem_tracker_->AddGcFunction(boost::bind(&MallocExtension::ReleaseFreeMemory, - MallocExtension::instance())); #else // tcmalloc metrics aren't defined in ASAN builds, just use the default behavior to // track process memory usage (sum of all children trackers). @@ -338,6 +332,22 @@ Status ExecEnv::StartServices() { RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get())); + mem_tracker_->AddGcFunction( + [this](int64_t bytes_to_free) { disk_io_mgr_->GcIoBuffers(bytes_to_free); }); + + // TODO: IMPALA-3200: register BufferPool::ReleaseMemory() as GC function. + +#ifndef ADDRESS_SANITIZER + // Since tcmalloc does not free unused memory, we may exceed the process mem limit even + // if Impala is not actually using that much memory. Add a callback to free any unused + // memory if we hit the process limit. TCMalloc GC must run last, because other GC + // functions may have released memory to TCMalloc, and TCMalloc may have cached it + // instead of releasing it to the system. + mem_tracker_->AddGcFunction([](int64_t bytes_to_free) { + MallocExtension::instance()->ReleaseToSystem(bytes_to_free); + }); +#endif + // Start services in order to ensure that dependencies between them are met if (enable_webserver_) { AddDefaultUrlCallbacks(webserver_.get(), mem_tracker_.get(), metrics_.get()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index a48ede9..9e3775d 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -292,26 +292,37 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta return status; } +void MemTracker::AddGcFunction(GcFunction f) { + gc_functions_.push_back(f); +} + bool MemTracker::GcMemory(int64_t max_consumption) { if (max_consumption < 0) return true; lock_guard<mutex> l(gc_lock_); if (consumption_metric_ != NULL) consumption_->Set(consumption_metric_->value()); - uint64_t pre_gc_consumption = consumption(); + int64_t pre_gc_consumption = consumption(); // Check if someone gc'd before us if (pre_gc_consumption < max_consumption) return false; if (num_gcs_metric_ != NULL) num_gcs_metric_->Increment(1); + int64_t curr_consumption = pre_gc_consumption; // Try to free up some memory for (int i = 0; i < gc_functions_.size(); ++i) { - gc_functions_[i](); + // Try to free up the amount we are over plus some extra so that we don't have to + // immediately GC again. Don't free all the memory since that can be unnecessarily + // expensive. + const int64_t EXTRA_BYTES_TO_FREE = 512L * 1024L * 1024L; + int64_t bytes_to_free = curr_consumption - max_consumption + EXTRA_BYTES_TO_FREE; + gc_functions_[i](bytes_to_free); if (consumption_metric_ != NULL) RefreshConsumptionFromMetric(); - if (consumption() <= max_consumption) break; + curr_consumption = consumption(); + if (max_consumption - curr_consumption <= EXTRA_BYTES_TO_FREE) break; } if (bytes_freed_by_last_gc_metric_ != NULL) { - bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - consumption()); + bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - curr_consumption); } - return consumption() > max_consumption; + return curr_consumption > max_consumption; } void MemTracker::GcTcmalloc() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index acf3b46..863d3c7 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -294,14 +294,17 @@ class MemTracker { MemTracker* parent() const { return parent_; } - /// Signature for function that can be called to free some memory after limit is reached. - /// See the class header for further details on what these functions should do. - typedef boost::function<void ()> GcFunction; - - /// Add a function 'f' to be called if the limit is reached. + /// Signature for function that can be called to free some memory after limit is + /// reached. The function should try to free at least 'bytes_to_free' bytes of + /// memory. See the class header for further details on the expected behaviour of + /// these functions. + typedef std::function<void(int64_t bytes_to_free)> GcFunction; + + /// Add a function 'f' to be called if the limit is reached, if none of the other + /// previously-added GC functions were successful at freeing up enough memory. /// 'f' does not need to be thread-safe as long as it is added to only one MemTracker. /// Note that 'f' must be valid for the lifetime of this MemTracker. - void AddGcFunction(GcFunction f) { gc_functions_.push_back(f); } + void AddGcFunction(GcFunction f); /// Register this MemTracker's metrics. Each key will be of the form /// "<prefix>.<metric name>". http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index b220fff..dee3c64 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -481,7 +481,9 @@ TEST_F(TmpFileMgrTest, TestEncryptionDuringCancellation) { string file_path = handle->TmpFilePath(); // Cancel the write - prior to the IMPALA-4820 fix decryption could race with the write. - ASSERT_OK(file_group.CancelWriteAndRestoreData(move(handle), data_mem_range)); + handle->Cancel(); + handle->WaitForWrite(); + ASSERT_OK(file_group.RestoreData(move(handle), data_mem_range)); WaitForCallbacks(1); // Read the data from the scratch file and check that the plaintext isn't present. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index c939d88..abba192 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -407,13 +407,11 @@ Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) { return Status::OK(); } -Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData( +Status TmpFileMgr::FileGroup::RestoreData( unique_ptr<WriteHandle> handle, MemRange buffer) { DCHECK_EQ(handle->write_range_->data(), buffer.data()); DCHECK_EQ(handle->len(), buffer.len()); - handle->Cancel(); - - handle->WaitForWrite(); + DCHECK(!handle->write_in_flight_); // Decrypt after the write is finished, so that we don't accidentally write decrypted // data to disk. Status status; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index cab2d87..d66c527 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -117,8 +117,7 @@ class TmpFileMgr { /// a different thread when the write completes successfully or unsuccessfully or is /// cancelled. /// - /// 'handle' must be destroyed by passing the DestroyWriteHandle() or - /// CancelWriteAndRestoreData(). + /// 'handle' must be destroyed by passing the DestroyWriteHandle() or RestoreData(). Status Write(MemRange buffer, WriteDoneCallback cb, std::unique_ptr<WriteHandle>* handle) WARN_UNUSED_RESULT; @@ -127,11 +126,11 @@ class TmpFileMgr { /// after a write successfully completes. Status Read(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT; - /// Cancels the write referenced by 'handle' and destroy associate resources. Also - /// restore the original data in the 'buffer' passed to Write(), decrypting or - /// decompressing as necessary. The cancellation always succeeds, but an error - /// is returned if restoring the data fails. - Status CancelWriteAndRestoreData( + /// Restore the original data in the 'buffer' passed to Write(), decrypting or + /// decompressing as necessary. Returns an error if restoring the data fails. + /// The write must not be in-flight - the caller is responsible for waiting for + /// the write to complete. + Status RestoreData( std::unique_ptr<WriteHandle> handle, MemRange buffer) WARN_UNUSED_RESULT; /// Wait for the in-flight I/Os to complete and destroy resources associated with @@ -255,19 +254,27 @@ class TmpFileMgr { /// handle can be passed to FileGroup::Read() to read back the data zero or more times. /// FileGroup::DestroyWriteHandle() can be called at any time to destroy the handle and /// allow reuse of the scratch file range written to. Alternatively, - /// FileGroup::CancelWriteAndRestoreData() can be called to reverse the effects of - /// FileGroup::Write() by destroying the handle and restoring the original data to the - /// buffer, so long as the data in the buffer was not modified by the caller. + /// FileGroup::RestoreData() can be called to reverse the effects of FileGroup::Write() + /// by destroying the handle and restoring the original data to the buffer, so long as + /// the data in the buffer was not modified by the caller. /// /// Public methods of WriteHandle are safe to call concurrently from multiple threads. class WriteHandle { public: /// The write must be destroyed by passing it to FileGroup - destroying it before - /// cancelling the write is an error. - ~WriteHandle() { - DCHECK(!write_in_flight_); - DCHECK(is_cancelled_); - } + /// the write completes is an error. + ~WriteHandle() { DCHECK(!write_in_flight_); } + + /// Cancels the write asynchronously. After Cancel() is called, writes are not + /// retried. The write callback may be called with a CANCELLED status (unless + /// it succeeded or encountered a different error first). + /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it. + void Cancel(); + + /// Blocks until the write completes either successfully or unsuccessfully. + /// May return before the write callback has been called. + /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it. + void WaitForWrite(); /// Path of temporary file backing the block. Intended for use in testing. /// Returns empty string if no backing file allocated. @@ -296,13 +303,6 @@ class TmpFileMgr { Status RetryWrite(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) WARN_UNUSED_RESULT; - /// Cancels the write asynchronously. After Cancel() is called, writes are not - /// retried. - void Cancel(); - - /// Blocks until the write completes either successfully or unsuccessfully. - void WaitForWrite(); - /// Called when the write has completed successfully or not. Sets 'write_in_flight_' /// then calls 'cb_'. void WriteComplete(const Status& write_status); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/testutil/cpu-util.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/cpu-util.h b/be/src/testutil/cpu-util.h new file mode 100644 index 0000000..d465c52 --- /dev/null +++ b/be/src/testutil/cpu-util.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_TESTUTIL_CPU_UTIL_H_ +#define IMPALA_TESTUTIL_CPU_UTIL_H_ + +#include <pthread.h> +#include <sched.h> + +#include "testutil/gtest-util.h" +#include "testutil/rand-util.h" +#include "util/cpu-info.h" + +namespace impala { + +class CpuTestUtil { + public: + /// Set the thread affinity so that it always runs on 'core'. Fail the test if + /// unsuccessful. + static void PinToCore(int core) { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(core, &cpuset); + ASSERT_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset)) + << core; + ASSERT_EQ(core, CpuInfo::GetCurrentCore()); + } + + /// Reset the thread affinity of the current thread to all cores. + static void ResetAffinity() { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + for (int i = 0; i < CpuInfo::GetMaxNumCores(); ++i) { + CPU_SET(i, &cpuset); + } + ASSERT_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset)); + } + + /// Choose a random core in [0, CpuInfo::num_cores()) and pin the current thread to it. + /// Uses 'rng' for randomness. + static void PinToRandomCore(std::mt19937* rng) { + int core = std::uniform_int_distribution<int>(0, CpuInfo::num_cores() - 1)(*rng); + PinToCore(core); + } + + /// Setup a fake NUMA setup where CpuInfo will report a NUMA configuration other than + /// the system's actual configuration. If 'has_numa' is true, sets it up as three NUMA + /// nodes with the cores distributed between them. Otherwise sets it up as a single + /// NUMA node. + static void SetupFakeNuma(bool has_numa) { + std::vector<int> core_to_node(CpuInfo::GetMaxNumCores()); + int num_nodes = has_numa ? 3 : 1; + for (int i = 0; i < core_to_node.size(); ++i) core_to_node[i] = i % num_nodes; + CpuInfo::InitFakeNumaForTest(num_nodes, core_to_node); + LOG(INFO) << CpuInfo::DebugString(); + } +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/testutil/rand-util.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/rand-util.h b/be/src/testutil/rand-util.h new file mode 100644 index 0000000..6cdbed4 --- /dev/null +++ b/be/src/testutil/rand-util.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_TESTUTIL_RAND_UTIL_H_ +#define IMPALA_TESTUTIL_RAND_UTIL_H_ + +#include <cstdint> +#include <cstdlib> +#include <random> + +#include "common/logging.h" + +namespace impala { + +/// Test helpers for randomised tests. +class RandTestUtil { + public: + /// Seed 'rng' with a seed either from the environment variable 'env_var' or the + /// current time. + static void SeedRng(const char* env_var, std::mt19937* rng) { + const char* seed_str = getenv(env_var); + int64_t seed; + if (seed_str != nullptr) { + seed = atoi(seed_str); + } else { + seed = time(nullptr); + } + LOG(INFO) << "Random seed (overridable with " << env_var << "): " << seed; + rng->seed(seed); + } +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/util/cpu-info.cc ---------------------------------------------------------------------- diff --git a/be/src/util/cpu-info.cc b/be/src/util/cpu-info.cc index e5d2a83..a32571e 100644 --- a/be/src/util/cpu-info.cc +++ b/be/src/util/cpu-info.cc @@ -77,6 +77,8 @@ int CpuInfo::max_num_cores_; string CpuInfo::model_name_ = "unknown"; int CpuInfo::max_num_numa_nodes_; unique_ptr<int[]> CpuInfo::core_to_numa_node_; +vector<vector<int>> CpuInfo::numa_node_to_cores_; +vector<int> CpuInfo::numa_node_core_idx_; static struct { string name; @@ -182,6 +184,7 @@ void CpuInfo::InitNuma() { // Assume a single NUMA node. max_num_numa_nodes_ = 1; std::fill_n(core_to_numa_node_.get(), max_num_cores_, 0); + InitNumaNodeToCores(); return; } @@ -215,6 +218,29 @@ void CpuInfo::InitNuma() { core_to_numa_node_[core] = 0; } } + InitNumaNodeToCores(); +} + +void CpuInfo::InitFakeNumaForTest( + int max_num_numa_nodes, const vector<int>& core_to_numa_node) { + DCHECK_EQ(max_num_cores_, core_to_numa_node.size()); + max_num_numa_nodes_ = max_num_numa_nodes; + for (int i = 0; i < max_num_cores_; ++i) { + core_to_numa_node_[i] = core_to_numa_node[i]; + } + numa_node_to_cores_.clear(); + InitNumaNodeToCores(); +} + +void CpuInfo::InitNumaNodeToCores() { + DCHECK(numa_node_to_cores_.empty()); + numa_node_to_cores_.resize(max_num_numa_nodes_); + numa_node_core_idx_.resize(max_num_cores_); + for (int core = 0; core < max_num_cores_; ++core) { + vector<int>* cores_of_node = &numa_node_to_cores_[core_to_numa_node_[core]]; + numa_node_core_idx_[core] = cores_of_node->size(); + cores_of_node->push_back(core); + } } void CpuInfo::VerifyCpuRequirements() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/util/cpu-info.h ---------------------------------------------------------------------- diff --git a/be/src/util/cpu-info.h b/be/src/util/cpu-info.h index 28af3e5..38d6782 100644 --- a/be/src/util/cpu-info.h +++ b/be/src/util/cpu-info.h @@ -21,6 +21,7 @@ #include <memory> #include <string> +#include <vector> #include <boost/cstdint.hpp> #include "common/logging.h" @@ -109,10 +110,35 @@ class CpuInfo { /// [0, GetMaxNumCores()). static int GetNumaNodeOfCore(int core) { DCHECK_LE(0, core); - DCHECK_LT(core, max_num_numa_nodes_); + DCHECK_LT(core, max_num_cores_); return core_to_numa_node_[core]; } + /// Returns the cores in a NUMA node. 'node' must be in the range + /// [0, GetMaxNumNumaNodes()). + static const std::vector<int>& GetCoresOfNumaNode(int node) { + DCHECK_LE(0, node); + DCHECK_LT(node, max_num_numa_nodes_); + return numa_node_to_cores_[node]; + } + + /// Returns the cores in the same NUMA node as 'core'. 'core' must be in the range + /// [0, GetMaxNumCores()). + static const std::vector<int>& GetCoresOfSameNumaNode(int core) { + DCHECK_LE(0, core); + DCHECK_LT(core, max_num_cores_); + return GetCoresOfNumaNode(GetNumaNodeOfCore(core)); + } + + /// Returns the index of the given core within the vector returned by + /// GetCoresOfNumaNode() and GetCoresOfSameNumaNode(). 'core' must be in the range + /// [0, GetMaxNumCores()). + static int GetNumaNodeCoreIdx(int core) { + DCHECK_LE(0, core); + DCHECK_LT(core, max_num_cores_); + return numa_node_core_idx_[core]; + } + /// Returns the model name of the cpu (e.g. Intel i7-2600) static std::string model_name() { DCHECK(initialized_); @@ -150,10 +176,23 @@ class CpuInfo { bool reenable_; }; + protected: + friend class CpuTestUtil; + + /// Setup fake NUMA info to simulate NUMA for backend tests. Sets up CpuInfo to + /// simulate 'max_num_numa_nodes' with 'core_to_numa_node' specifying the NUMA node + /// of each core in [0, GetMaxNumCores()). + static void InitFakeNumaForTest( + int max_num_numa_nodes, const std::vector<int>& core_to_numa_node); + private: /// Initialize NUMA-related state - called from Init(); static void InitNuma(); + /// Initialize 'numa_node_to_cores_' based on 'max_num_numa_nodes_' and + /// 'core_to_numa_node_'. Called from InitNuma(); + static void InitNumaNodeToCores(); + /// Populates the arguments with information about this machine's caches. /// The values returned are not reliable in some environments, e.g. RHEL5 on EC2, so /// so we will keep this as a private method. @@ -173,7 +212,14 @@ class CpuInfo { /// Array with 'max_num_cores_' entries, each of which is the NUMA node of that core. static std::unique_ptr<int[]> core_to_numa_node_; -}; + /// Vector with 'max_num_numa_nodes_' entries, each of which is a vector of the cores + /// belonging to that NUMA node. + static std::vector<std::vector<int>> numa_node_to_cores_; + + /// Array with 'max_num_cores_' entries, each of which is the index of that core in its + /// NUMA node. + static std::vector<int> numa_node_core_idx_; +}; } #endif
