cmcfarlen commented on code in PR #11871: URL: https://github.com/apache/trafficserver/pull/11871#discussion_r1844188335
########## src/iocore/net/Server.cc: ########## @@ -88,49 +104,205 @@ Server::close() return sock.close(); } +#if TS_USE_NUMA +// Assumes that threads can be assigned to NUMA zones as 0,1,2,3,0,1,2,3,0,1,2 sequence with no gaps. + +class NUMASequencer +{ + std::mutex mutex; + std::condition_variable convar; + std::vector<int> thread_ids; // To store thread IDs + size_t cur_index = 0; // Index to track the current thread to execute + bool initialized = false; // Flag to ensure initialization happens once + bool ready_to_run = false; // Flag to ensure threads only start executing when all IDs are collected + +public: + template <class T> + bool + run_sequential(T func) + { + std::unique_lock<std::mutex> lock(mutex); + + int my_thread_id = this_ethread()->id; + int my_numa_node = this_ethread()->get_numa_node(); + + Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) entered run_sequential.", my_thread_id, my_numa_node); + + // Initialize and populate the thread IDs vector + if (!initialized) { + initialized = true; + thread_ids.reserve(eventProcessor.net_threads); // Preallocate space + Debug("numa_sequencer", "[NUMASequencer] Initialized thread ID vector with capacity %d.", eventProcessor.net_threads); + } + + // Add the current thread ID to the list if it's not already present + if (std::find(thread_ids.begin(), thread_ids.end(), my_thread_id) == thread_ids.end()) { + thread_ids.push_back(my_thread_id); + Debug("numa_sequencer", "[NUMASequencer] Added Thread %d to the thread ID list. Total threads collected: %zu", my_thread_id, + thread_ids.size()); + } + + // If all threads have been added (assuming their number is equal to eventProcessor.net_threads), sort the thread IDs and set + // ready_to_run to true + if (thread_ids.size() == eventProcessor.net_threads) { + std::sort(thread_ids.begin(), thread_ids.end()); + Debug("numa_sequencer", "[NUMASequencer] All thread IDs collected. Sorting thread IDs..."); + Debug("numa_sequencer", "[NUMASequencer] Thread IDs sorted. Execution will follow this order:"); + for (size_t i = 0; i < thread_ids.size(); ++i) { + Debug("numa_sequencer", "[NUMASequencer] Execution order %zu: Thread ID %d", i + 1, thread_ids[i]); + } + ready_to_run = true; + convar.notify_all(); // Notify all threads that execution can begin + } + + // Wait until all thread IDs are collected and ready_to_run is true + while (!ready_to_run) { + Debug("numa_sequencer", "[NUMASequencer] Thread %d is waiting for all thread IDs to be collected.", my_thread_id); + convar.wait(lock); + } + + // Logging the current state before entering the wait loop + Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) waiting to execute. Current sequence index: %zu", + my_thread_id, my_numa_node, cur_index); + + // Wait until it's this thread's turn based on sorted IDs + while (cur_index < thread_ids.size() && thread_ids[cur_index] != my_thread_id) { + Debug("numa_sequencer", "[NUMASequencer] Thread %d is not yet in sequence. Waiting...", my_thread_id); + convar.wait(lock); + } + + // Log when the thread has been awakened and is about to execute the function + Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) awakened. About to execute function.", my_thread_id, + my_numa_node); + + // Execute the function + bool result = func(); + + // More detailed logging for debugging + if (result) { + Debug("numa_sequencer", "[NUMASequencer] Thread %d successfully executed the function on NUMA node %d.", this_ethread()->id, + my_numa_node); + } else { + Error("[NUMASequencer] Thread %d failed to execute the function on NUMA node %d.", this_ethread()->id, my_numa_node); + } + + // Move to the next thread in the sequence + cur_index++; + Debug("numa_sequencer", "[NUMASequencer] Thread %d completed execution. Moving to next thread. New index: %zu.", my_thread_id, + cur_index); + + // If we've completed one pass through all threads, reset the index and increment the repeat counter + if (cur_index >= thread_ids.size()) { + cur_index = 0; + Debug("numa_sequencer", "[NUMASequencer] Completed a full pass through all threads. Resetting index."); + } + + // Notify all threads about the change in sequence + convar.notify_all(); + + return result; + } +}; +NUMASequencer numa_sequencer; +#endif + int Server::listen(bool non_blocking, const NetProcessor::AcceptOptions &opt) { ink_assert(!sock.is_ok()); int res = 0; socklen_t namelen; int prot = IPPROTO_TCP; +#if TS_USE_NUMA + int use_ebpf = 0; + int affinity = 1; + bool success = false; + + // Define the initialization function that will be run sequentially + auto init_func = [&]() -> bool { + // Additional setup after listen + Debug("numa", "[Server::listen] Attempting to set up fd after listen with options: %d", opt); + if ((res = setup_fd_after_listen(opt)) < 0) { + Error("[Server::listen] Failed to setup fd after listen: %d", res); + return false; + } + Debug("numa", "[Server::listen] Thread %d successfully set up the socket.", this_ethread()->id); + return true; + }; +#endif + // Set the IP address for binding if (!ats_is_ip(&accept_addr)) { ats_ip4_set(&addr, INADDR_ANY, 0); } else { ats_ip_copy(&addr, &accept_addr); } - + // Set protocol for MPTCP if enabled if (opt.f_mptcp) { Dbg(dbg_ctl_connection, "Define socket with MPTCP"); prot = IPPROTO_MPTCP; } + // Create the socket + Debug("numa", "[Server::listen] Attempting to create socket with family: %d, type: %d, protocol: %d", addr.sa.sa_family, Review Comment: These Debug lines were maybe for your debug? Could they be removed? ########## src/cripts/Lulu.cc: ########## @@ -163,6 +163,12 @@ cripts::Splitter(cripts::string_view input, char delim) return details::Splitter<cripts::string_view>(input, delim); } +std::vector<Cript::string_view> +Cript::splitter(Cript::string_view input, char delim) +{ + return details::splitter<Cript::string_view>(input, delim); +} + Review Comment: You can remove this chunk, perhaps a merge conflict issue. ########## src/iocore/net/Server.cc: ########## @@ -88,49 +104,205 @@ Server::close() return sock.close(); } +#if TS_USE_NUMA +// Assumes that threads can be assigned to NUMA zones as 0,1,2,3,0,1,2,3,0,1,2 sequence with no gaps. + +class NUMASequencer +{ + std::mutex mutex; + std::condition_variable convar; + std::vector<int> thread_ids; // To store thread IDs + size_t cur_index = 0; // Index to track the current thread to execute + bool initialized = false; // Flag to ensure initialization happens once + bool ready_to_run = false; // Flag to ensure threads only start executing when all IDs are collected + +public: + template <class T> + bool + run_sequential(T func) + { + std::unique_lock<std::mutex> lock(mutex); + + int my_thread_id = this_ethread()->id; + int my_numa_node = this_ethread()->get_numa_node(); + + Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) entered run_sequential.", my_thread_id, my_numa_node); Review Comment: Debug has been removed, please use the Dbg macro instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@trafficserver.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org