cmcfarlen commented on code in PR #11871:
URL: https://github.com/apache/trafficserver/pull/11871#discussion_r1844188335


##########
src/iocore/net/Server.cc:
##########
@@ -88,49 +104,205 @@ Server::close()
   return sock.close();
 }
 
+#if TS_USE_NUMA
+// Assumes that threads can be assigned to NUMA zones as 0,1,2,3,0,1,2,3,0,1,2 
sequence with no gaps.
+
+class NUMASequencer
+{
+  std::mutex              mutex;
+  std::condition_variable convar;
+  std::vector<int>        thread_ids;           // To store thread IDs
+  size_t                  cur_index    = 0;     // Index to track the current 
thread to execute
+  bool                    initialized  = false; // Flag to ensure 
initialization happens once
+  bool                    ready_to_run = false; // Flag to ensure threads only 
start executing when all IDs are collected
+
+public:
+  template <class T>
+  bool
+  run_sequential(T func)
+  {
+    std::unique_lock<std::mutex> lock(mutex);
+
+    int my_thread_id = this_ethread()->id;
+    int my_numa_node = this_ethread()->get_numa_node();
+
+    Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) entered 
run_sequential.", my_thread_id, my_numa_node);
+
+    // Initialize and populate the thread IDs vector
+    if (!initialized) {
+      initialized = true;
+      thread_ids.reserve(eventProcessor.net_threads); // Preallocate space
+      Debug("numa_sequencer", "[NUMASequencer] Initialized thread ID vector 
with capacity %d.", eventProcessor.net_threads);
+    }
+
+    // Add the current thread ID to the list if it's not already present
+    if (std::find(thread_ids.begin(), thread_ids.end(), my_thread_id) == 
thread_ids.end()) {
+      thread_ids.push_back(my_thread_id);
+      Debug("numa_sequencer", "[NUMASequencer] Added Thread %d to the thread 
ID list. Total threads collected: %zu", my_thread_id,
+            thread_ids.size());
+    }
+
+    // If all threads have been added (assuming their number is equal to 
eventProcessor.net_threads), sort the thread IDs and set
+    // ready_to_run to true
+    if (thread_ids.size() == eventProcessor.net_threads) {
+      std::sort(thread_ids.begin(), thread_ids.end());
+      Debug("numa_sequencer", "[NUMASequencer] All thread IDs collected. 
Sorting thread IDs...");
+      Debug("numa_sequencer", "[NUMASequencer] Thread IDs sorted. Execution 
will follow this order:");
+      for (size_t i = 0; i < thread_ids.size(); ++i) {
+        Debug("numa_sequencer", "[NUMASequencer] Execution order %zu: Thread 
ID %d", i + 1, thread_ids[i]);
+      }
+      ready_to_run = true;
+      convar.notify_all(); // Notify all threads that execution can begin
+    }
+
+    // Wait until all thread IDs are collected and ready_to_run is true
+    while (!ready_to_run) {
+      Debug("numa_sequencer", "[NUMASequencer] Thread %d is waiting for all 
thread IDs to be collected.", my_thread_id);
+      convar.wait(lock);
+    }
+
+    // Logging the current state before entering the wait loop
+    Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) waiting 
to execute. Current sequence index: %zu",
+          my_thread_id, my_numa_node, cur_index);
+
+    // Wait until it's this thread's turn based on sorted IDs
+    while (cur_index < thread_ids.size() && thread_ids[cur_index] != 
my_thread_id) {
+      Debug("numa_sequencer", "[NUMASequencer] Thread %d is not yet in 
sequence. Waiting...", my_thread_id);
+      convar.wait(lock);
+    }
+
+    // Log when the thread has been awakened and is about to execute the 
function
+    Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) 
awakened. About to execute function.", my_thread_id,
+          my_numa_node);
+
+    // Execute the function
+    bool result = func();
+
+    // More detailed logging for debugging
+    if (result) {
+      Debug("numa_sequencer", "[NUMASequencer] Thread %d successfully executed 
the function on NUMA node %d.", this_ethread()->id,
+            my_numa_node);
+    } else {
+      Error("[NUMASequencer] Thread %d failed to execute the function on NUMA 
node %d.", this_ethread()->id, my_numa_node);
+    }
+
+    // Move to the next thread in the sequence
+    cur_index++;
+    Debug("numa_sequencer", "[NUMASequencer] Thread %d completed execution. 
Moving to next thread. New index: %zu.", my_thread_id,
+          cur_index);
+
+    // If we've completed one pass through all threads, reset the index and 
increment the repeat counter
+    if (cur_index >= thread_ids.size()) {
+      cur_index = 0;
+      Debug("numa_sequencer", "[NUMASequencer] Completed a full pass through 
all threads. Resetting index.");
+    }
+
+    // Notify all threads about the change in sequence
+    convar.notify_all();
+
+    return result;
+  }
+};
+NUMASequencer numa_sequencer;
+#endif
+
 int
 Server::listen(bool non_blocking, const NetProcessor::AcceptOptions &opt)
 {
   ink_assert(!sock.is_ok());
   int       res = 0;
   socklen_t namelen;
   int       prot = IPPROTO_TCP;
+#if TS_USE_NUMA
+  int  use_ebpf = 0;
+  int  affinity = 1;
+  bool success  = false;
+
+  // Define the initialization function that will be run sequentially
+  auto init_func = [&]() -> bool {
+    // Additional setup after listen
+    Debug("numa", "[Server::listen] Attempting to set up fd after listen with 
options: %d", opt);
+    if ((res = setup_fd_after_listen(opt)) < 0) {
+      Error("[Server::listen] Failed to setup fd after listen: %d", res);
+      return false;
+    }
 
+    Debug("numa", "[Server::listen] Thread %d successfully set up the 
socket.", this_ethread()->id);
+    return true;
+  };
+#endif
+  // Set the IP address for binding
   if (!ats_is_ip(&accept_addr)) {
     ats_ip4_set(&addr, INADDR_ANY, 0);
   } else {
     ats_ip_copy(&addr, &accept_addr);
   }
-
+  // Set protocol for MPTCP if enabled
   if (opt.f_mptcp) {
     Dbg(dbg_ctl_connection, "Define socket with MPTCP");
     prot = IPPROTO_MPTCP;
   }
 
+  // Create the socket
+  Debug("numa", "[Server::listen] Attempting to create socket with family: %d, 
type: %d, protocol: %d", addr.sa.sa_family,

Review Comment:
   These Debug lines were maybe for your debug? Could they be removed?



##########
src/cripts/Lulu.cc:
##########
@@ -163,6 +163,12 @@ cripts::Splitter(cripts::string_view input, char delim)
   return details::Splitter<cripts::string_view>(input, delim);
 }
 
+std::vector<Cript::string_view>
+Cript::splitter(Cript::string_view input, char delim)
+{
+  return details::splitter<Cript::string_view>(input, delim);
+}
+

Review Comment:
   You can remove this chunk, perhaps a merge conflict issue.



##########
src/iocore/net/Server.cc:
##########
@@ -88,49 +104,205 @@ Server::close()
   return sock.close();
 }
 
+#if TS_USE_NUMA
+// Assumes that threads can be assigned to NUMA zones as 0,1,2,3,0,1,2,3,0,1,2 
sequence with no gaps.
+
+class NUMASequencer
+{
+  std::mutex              mutex;
+  std::condition_variable convar;
+  std::vector<int>        thread_ids;           // To store thread IDs
+  size_t                  cur_index    = 0;     // Index to track the current 
thread to execute
+  bool                    initialized  = false; // Flag to ensure 
initialization happens once
+  bool                    ready_to_run = false; // Flag to ensure threads only 
start executing when all IDs are collected
+
+public:
+  template <class T>
+  bool
+  run_sequential(T func)
+  {
+    std::unique_lock<std::mutex> lock(mutex);
+
+    int my_thread_id = this_ethread()->id;
+    int my_numa_node = this_ethread()->get_numa_node();
+
+    Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) entered 
run_sequential.", my_thread_id, my_numa_node);

Review Comment:
   Debug has been removed, please use the Dbg macro instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@trafficserver.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to