cmcfarlen commented on code in PR #11871: URL: https://github.com/apache/trafficserver/pull/11871#discussion_r1844293636
########## src/iocore/cache/RamCacheContainer.cc: ########## @@ -0,0 +1,322 @@ +/** @file + + A cache wrapper that duplicates the underlying cache onto several NUMA nodes + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "P_Cache.h" +#include <array> +#include <mutex> +#include "tscore/NUMADebug.h" +#include "tscore/ink_thread.h" +#include <numaif.h> + +class RamCacheContainer : public RamCache +{ + std::vector<RamCache *> caches; + + int64_t max_bytes = 0; + StripeSM *stripe = nullptr; + bool init_called = false; + RamCache *get_cache(unsigned int my_node, unsigned int node); + +public: + void init_one_cache(); + // returns 1 on found/stored, 0 on not found/stored, if provided auxkey1 and auxkey2 must match + int get(CryptoHash *key, Ptr<IOBufferData> *ret_data, uint64_t auxkey = 0) override; + int put(CryptoHash *key, IOBufferData *data, uint32_t len, bool copy = false, uint64_t auxkey = 0) override; + int fixup(const CryptoHash *key, uint64_t old_auxkey, uint64_t new_auxkey) override; + int64_t size() const override; + void init(int64_t max_bytes_, StripeSM *stripe_) override; + RamCacheContainer(); + ~RamCacheContainer() override; +}; + +extern int cache_config_ram_cache_algorithm; +extern int cache_config_ram_cache_numa_duplicate; + +RamCacheContainer::RamCacheContainer() : caches(numa_max_node() + 1, nullptr) {} + +RamCacheContainer::~RamCacheContainer() +{ + for (auto c : caches) + delete c; +} + +RamCache * +new_RamCacheFromConfig() +{ + switch (cache_config_ram_cache_algorithm) { + default: + case RAM_CACHE_ALGORITHM_CLFUS: + return new_RamCacheCLFUS(); + case RAM_CACHE_ALGORITHM_LRU: + return new_RamCacheLRU(); + } +} + +RamCache * +new_RamCacheContainer() +{ + return new RamCacheContainer; +} + +void +RamCacheContainer::init_one_cache() +{ + unsigned int my_node = 0; + getcpu(nullptr, &my_node); + if (caches[my_node]) { + ink_error("Attempt to double-init duplicated cache!"); + } else { + RamCache *cache = new_RamCacheFromConfig(); + cache->init(max_bytes, stripe); + caches[my_node] = cache; + } +} + +RamCache * +RamCacheContainer::get_cache(unsigned int my_node, unsigned int node) +{ + return caches[node]; +} + +static void * +ram_cache_container_thread_init_func(void *arg) +{ + ((RamCacheContainer *)arg)->init_one_cache(); + return nullptr; +} + +void +RamCacheContainer::init(int64_t max_bytes_, StripeSM *stripe_) +{ + max_bytes = max_bytes_; + stripe = stripe_; + init_called = true; + std::vector<ink_thread> threads; + for (unsigned int node = 0; node < (unsigned int)caches.size(); ++node) { + ink_thread t; + hwloc_obj_t obj = hwloc_get_obj_by_type(ink_get_topology(), HWLOC_OBJ_NODE, node); + ink_thread_create(&t, ram_cache_container_thread_init_func, (void *)this, false, 0, nullptr, obj->cpuset); + threads.push_back(t); + } + for (auto t : threads) { + ink_thread_join(t); + } + for (auto cache : caches) { + if (!cache) { + ink_fatal("Failed to initialize NUMA local ram cache."); + } + } +} + +constexpr size_t PAGE_SIZE = 4096; + +static void * +align_pointer_to_page(const void *ptr) +{ + // Align to the page size (move_page requires that) + return reinterpret_cast<void *>(reinterpret_cast<intptr_t>(ptr) & (~(PAGE_SIZE - 1))); +} + +static size_t +page_count(const void *ptr, size_t size) +{ + auto start = reinterpret_cast<intptr_t>(ptr); + return (start + size + (PAGE_SIZE - 1)) / PAGE_SIZE - start / PAGE_SIZE; +} + +// returns true if consistent +static bool +check_pages_consistency(void *data, size_t size, const char *name = "") +{ + unsigned int my_node = 0; + getcpu(nullptr, &my_node); + + size_t count = page_count(data, size); + if (count == 0) + return true; + + data = align_pointer_to_page(data); + + intptr_t pos = reinterpret_cast<intptr_t>(data); + std::vector<void *> pages(count); + std::vector<int> status(count, 0); + for (size_t i = 0; i < count; ++i) { + pages[i] = reinterpret_cast<void *>(pos); + pos += PAGE_SIZE; + } + auto result = move_pages(0, count, pages.data(), nullptr, status.data(), 0); + if (result < 0) { + ink_notice("move_pages failed"); + return false; + } + + bool inconsistent = false; + if (status[0] >= 0) { + for (size_t i = 1; i < count; ++i) { + if (status[i] >= 0 && status[i] != status[0]) { + inconsistent = true; + break; + } + } + } + if (inconsistent) { + std::string print(count, '?'); + for (size_t i = 0; i < count; ++i) { + print[i] = '0' + status[i]; + } + ink_notice("Inconsistent pages at %s when putting data into cache %s, execution node=%d", name, print.c_str(), my_node); + } else { + // ink_notice("Consistent pages at %s when putting data into cache, node=%d", name, status[0]); + } + return !inconsistent; +} + +int +RamCacheContainer::get(CryptoHash *key, Ptr<IOBufferData> *ret_data, uint64_t auxkey) +{ + unsigned int my_node = 0; + getcpu(nullptr, &my_node); + // Do we have it? + RamCache *my_cache = get_cache(my_node, my_node); + if (!my_cache) + return 0; + if (my_cache->get(key, ret_data, auxkey)) { + // From difference, can tell if its not coherent + NUMA_CHECK((*ret_data)->data(), 0); + NUMA_CHECK((*ret_data)->data(), (*ret_data)->block_size()); + return 1; + } + // Do we have it on some other node? + + for (unsigned i = 0; i < caches.size(); i++) + if (i != my_node) { + RamCache *cache = get_cache(my_node, i); + if (cache && cache->get(key, ret_data, auxkey)) { + my_cache->put(key, (*ret_data).get(), (*ret_data)->block_size(), false, auxkey); + return 1; + } + } + return 0; +} + +static void +move_pages_to_numa_zone(void *data, size_t size, int dest_numa) +{ + size_t pcount = page_count(data, size); + if (pcount == 0) + return; + + data = align_pointer_to_page(data); + intptr_t pos = reinterpret_cast<intptr_t>(data); + std::vector<void *> pages(pcount); + std::vector<int> nodes(pcount); + std::vector<int> status(pcount, 0); + for (size_t i = 0; i < pcount; ++i) { + nodes[i] = dest_numa; + pages[i] = reinterpret_cast<void *>(pos); + pos += PAGE_SIZE; + } + auto result = move_pages(0, pcount, pages.data(), nodes.data(), status.data(), MPOL_MF_MOVE); + if (result < 0) { + ink_notice("move_pages failed"); + return; + } + size_t failures = 0; + for (auto &s : status) { + if (s < 0) + failures++; + } + if (failures > 0) { + ink_notice("move_pages_to_numa_zone had %" PRIuPTR " failures", (uintptr_t)failures); + } + // TODO: error reporting + bool inconsistent = false; + if (status[0] >= 0) { + for (size_t i = 1; i < pcount; ++i) { + if (status[i] >= 0 && status[i] != status[0]) { + inconsistent = true; + break; + } + } + } + if (inconsistent) { + std::string print(pcount, '?'); + for (size_t i = 0; i < pcount; ++i) { + print[i] = '0' + status[i]; + } + ink_notice("Inconsistent pages after move_pages %s", print.c_str()); + } +} + +static void +move_pages_to_current_numa_zone(void *data, size_t size) Review Comment: This function is unused ########## src/iocore/net/Server.cc: ########## @@ -88,49 +104,205 @@ Server::close() return sock.close(); } +#if TS_USE_NUMA +// Assumes that threads can be assigned to NUMA zones as 0,1,2,3,0,1,2,3,0,1,2 sequence with no gaps. + +class NUMASequencer +{ + std::mutex mutex; + std::condition_variable convar; + std::vector<int> thread_ids; // To store thread IDs + size_t cur_index = 0; // Index to track the current thread to execute + bool initialized = false; // Flag to ensure initialization happens once + bool ready_to_run = false; // Flag to ensure threads only start executing when all IDs are collected + +public: + template <class T> + bool + run_sequential(T func) + { + std::unique_lock<std::mutex> lock(mutex); + + int my_thread_id = this_ethread()->id; + int my_numa_node = this_ethread()->get_numa_node(); + + Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) entered run_sequential.", my_thread_id, my_numa_node); + + // Initialize and populate the thread IDs vector + if (!initialized) { + initialized = true; + thread_ids.reserve(eventProcessor.net_threads); // Preallocate space + Debug("numa_sequencer", "[NUMASequencer] Initialized thread ID vector with capacity %d.", eventProcessor.net_threads); + } + + // Add the current thread ID to the list if it's not already present + if (std::find(thread_ids.begin(), thread_ids.end(), my_thread_id) == thread_ids.end()) { + thread_ids.push_back(my_thread_id); + Debug("numa_sequencer", "[NUMASequencer] Added Thread %d to the thread ID list. Total threads collected: %zu", my_thread_id, + thread_ids.size()); + } + + // If all threads have been added (assuming their number is equal to eventProcessor.net_threads), sort the thread IDs and set + // ready_to_run to true + if (thread_ids.size() == eventProcessor.net_threads) { Review Comment: I get a signed/unsigned compare error for this line. ########## src/iocore/cache/RamCacheContainer.cc: ########## @@ -0,0 +1,322 @@ +/** @file + + A cache wrapper that duplicates the underlying cache onto several NUMA nodes + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "P_Cache.h" +#include <array> +#include <mutex> +#include "tscore/NUMADebug.h" +#include "tscore/ink_thread.h" +#include <numaif.h> + +class RamCacheContainer : public RamCache +{ + std::vector<RamCache *> caches; + + int64_t max_bytes = 0; + StripeSM *stripe = nullptr; + bool init_called = false; + RamCache *get_cache(unsigned int my_node, unsigned int node); + +public: + void init_one_cache(); + // returns 1 on found/stored, 0 on not found/stored, if provided auxkey1 and auxkey2 must match + int get(CryptoHash *key, Ptr<IOBufferData> *ret_data, uint64_t auxkey = 0) override; + int put(CryptoHash *key, IOBufferData *data, uint32_t len, bool copy = false, uint64_t auxkey = 0) override; + int fixup(const CryptoHash *key, uint64_t old_auxkey, uint64_t new_auxkey) override; + int64_t size() const override; + void init(int64_t max_bytes_, StripeSM *stripe_) override; + RamCacheContainer(); + ~RamCacheContainer() override; +}; + +extern int cache_config_ram_cache_algorithm; +extern int cache_config_ram_cache_numa_duplicate; + +RamCacheContainer::RamCacheContainer() : caches(numa_max_node() + 1, nullptr) {} + +RamCacheContainer::~RamCacheContainer() +{ + for (auto c : caches) + delete c; +} + +RamCache * +new_RamCacheFromConfig() +{ + switch (cache_config_ram_cache_algorithm) { + default: + case RAM_CACHE_ALGORITHM_CLFUS: + return new_RamCacheCLFUS(); + case RAM_CACHE_ALGORITHM_LRU: + return new_RamCacheLRU(); + } +} + +RamCache * +new_RamCacheContainer() +{ + return new RamCacheContainer; +} + +void +RamCacheContainer::init_one_cache() +{ + unsigned int my_node = 0; + getcpu(nullptr, &my_node); + if (caches[my_node]) { + ink_error("Attempt to double-init duplicated cache!"); + } else { + RamCache *cache = new_RamCacheFromConfig(); + cache->init(max_bytes, stripe); + caches[my_node] = cache; + } +} + +RamCache * +RamCacheContainer::get_cache(unsigned int my_node, unsigned int node) Review Comment: ```suggestion RamCacheContainer::get_cache(unsigned int /* my_node ATS_UNUSED */, unsigned int node) ``` ########## src/iocore/cache/RamCacheContainer.cc: ########## @@ -0,0 +1,322 @@ +/** @file + + A cache wrapper that duplicates the underlying cache onto several NUMA nodes + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "P_Cache.h" +#include <array> +#include <mutex> +#include "tscore/NUMADebug.h" +#include "tscore/ink_thread.h" +#include <numaif.h> + +class RamCacheContainer : public RamCache +{ + std::vector<RamCache *> caches; + + int64_t max_bytes = 0; + StripeSM *stripe = nullptr; + bool init_called = false; + RamCache *get_cache(unsigned int my_node, unsigned int node); + +public: + void init_one_cache(); + // returns 1 on found/stored, 0 on not found/stored, if provided auxkey1 and auxkey2 must match + int get(CryptoHash *key, Ptr<IOBufferData> *ret_data, uint64_t auxkey = 0) override; + int put(CryptoHash *key, IOBufferData *data, uint32_t len, bool copy = false, uint64_t auxkey = 0) override; + int fixup(const CryptoHash *key, uint64_t old_auxkey, uint64_t new_auxkey) override; + int64_t size() const override; + void init(int64_t max_bytes_, StripeSM *stripe_) override; + RamCacheContainer(); + ~RamCacheContainer() override; +}; + +extern int cache_config_ram_cache_algorithm; +extern int cache_config_ram_cache_numa_duplicate; + +RamCacheContainer::RamCacheContainer() : caches(numa_max_node() + 1, nullptr) {} + +RamCacheContainer::~RamCacheContainer() +{ + for (auto c : caches) + delete c; +} + +RamCache * +new_RamCacheFromConfig() +{ + switch (cache_config_ram_cache_algorithm) { + default: + case RAM_CACHE_ALGORITHM_CLFUS: + return new_RamCacheCLFUS(); + case RAM_CACHE_ALGORITHM_LRU: + return new_RamCacheLRU(); + } +} + +RamCache * +new_RamCacheContainer() +{ + return new RamCacheContainer; +} + +void +RamCacheContainer::init_one_cache() +{ + unsigned int my_node = 0; + getcpu(nullptr, &my_node); + if (caches[my_node]) { + ink_error("Attempt to double-init duplicated cache!"); + } else { + RamCache *cache = new_RamCacheFromConfig(); + cache->init(max_bytes, stripe); + caches[my_node] = cache; + } +} + +RamCache * +RamCacheContainer::get_cache(unsigned int my_node, unsigned int node) +{ + return caches[node]; +} + +static void * +ram_cache_container_thread_init_func(void *arg) +{ + ((RamCacheContainer *)arg)->init_one_cache(); + return nullptr; +} + +void +RamCacheContainer::init(int64_t max_bytes_, StripeSM *stripe_) +{ + max_bytes = max_bytes_; + stripe = stripe_; + init_called = true; + std::vector<ink_thread> threads; + for (unsigned int node = 0; node < (unsigned int)caches.size(); ++node) { + ink_thread t; + hwloc_obj_t obj = hwloc_get_obj_by_type(ink_get_topology(), HWLOC_OBJ_NODE, node); + ink_thread_create(&t, ram_cache_container_thread_init_func, (void *)this, false, 0, nullptr, obj->cpuset); + threads.push_back(t); + } + for (auto t : threads) { + ink_thread_join(t); + } + for (auto cache : caches) { + if (!cache) { + ink_fatal("Failed to initialize NUMA local ram cache."); + } + } +} + +constexpr size_t PAGE_SIZE = 4096; + +static void * +align_pointer_to_page(const void *ptr) +{ + // Align to the page size (move_page requires that) + return reinterpret_cast<void *>(reinterpret_cast<intptr_t>(ptr) & (~(PAGE_SIZE - 1))); +} + +static size_t +page_count(const void *ptr, size_t size) +{ + auto start = reinterpret_cast<intptr_t>(ptr); + return (start + size + (PAGE_SIZE - 1)) / PAGE_SIZE - start / PAGE_SIZE; +} + +// returns true if consistent +static bool +check_pages_consistency(void *data, size_t size, const char *name = "") Review Comment: This function is unused unless `NUMA_CONSISTENCY_CHECK` is defined. Please add precompile check here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@trafficserver.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org