laforge has submitted this change. ( 
https://gerrit.osmocom.org/c/upf-benchmark/+/38328?usp=email )

Change subject: Introduce osmo-udp-responder
......................................................................

Introduce osmo-udp-responder

Related: SYS#7096
Change-Id: I9edc2e3e1e41767673bcf96f1fe97fa4bf6d60f7
---
M configure.ac
M src/Makefile.am
A src/osmo-udp-responder/Makefile.am
A src/osmo-udp-responder/udp_responder.c
4 files changed, 823 insertions(+), 0 deletions(-)

Approvals:
  laforge: Looks good to me, approved; Verified




diff --git a/configure.ac b/configure.ac
index cc2b3c4..ec92355 100644
--- a/configure.ac
+++ b/configure.ac
@@ -164,6 +164,7 @@
     include/osmocom/pfcptool/Makefile
     src/Makefile
     src/osmo-pfcp-tool/Makefile
+    src/osmo-udp-responder/Makefile
     doc/Makefile
     doc/manuals/Makefile
     Makefile)
diff --git a/src/Makefile.am b/src/Makefile.am
index fecf898..731fc5e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1,3 +1,4 @@
 SUBDIRS = \
        osmo-pfcp-tool \
+       osmo-udp-responder \
        $(NULL)
diff --git a/src/osmo-udp-responder/Makefile.am 
b/src/osmo-udp-responder/Makefile.am
new file mode 100644
index 0000000..329c5f2
--- /dev/null
+++ b/src/osmo-udp-responder/Makefile.am
@@ -0,0 +1,25 @@
+AM_CPPFLAGS = \
+       $(all_includes) \
+       -I$(top_srcdir)/include \
+       -I$(top_builddir)/include \
+       -I$(top_builddir) \
+       $(NULL)
+
+AM_CFLAGS = \
+       -Wall \
+       $(LIBOSMOCORE_CFLAGS) \
+       $(LIBURING_CFLAGS) \
+       $(NULL)
+
+AM_LDFLAGS = \
+       $(LIBOSMOCORE_LIBS) \
+       $(LIBURING_LIBS) \
+       $(NULL)
+
+bin_PROGRAMS = \
+       osmo-udp-responder \
+       $(NULL)
+
+osmo_udp_responder_SOURCES = \
+       udp_responder.c \
+       $(NULL)
diff --git a/src/osmo-udp-responder/udp_responder.c 
b/src/osmo-udp-responder/udp_responder.c
new file mode 100644
index 0000000..5ffe707
--- /dev/null
+++ b/src/osmo-udp-responder/udp_responder.c
@@ -0,0 +1,796 @@
+/* UDP responder: listen on a UDP port, and respond to each received UDP 
packet back to the sender. */
+/*
+ * (C) 2024 by sysmocom - s.f.m.c. GmbH <[email protected]>
+ * All Rights Reserved.
+ *
+ * Author: Neels Janosch Hofmeyr <[email protected]>
+ * Author: Pau Espin Pedrol <[email protected]>
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+#include "config.h"
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <osmocom/core/application.h>
+#include <osmocom/core/logging.h>
+#include <osmocom/core/timer.h>
+
+#if HAVE_URING
+
+#include <getopt.h>
+#include <string.h>
+#include <limits.h>
+#include <unistd.h>
+#include <liburing.h>
+#include <pthread.h>
+#include <sched.h>
+#include <netinet/in.h>
+
+#include <osmocom/core/utils.h>
+#include <osmocom/core/socket.h>
+#include <osmocom/core/sockaddr_str.h>
+
+#include <osmocom/pfcptool/gtp_flood.h>
+
+static volatile bool started;
+static struct timespec ts_start;
+
+struct cmdline_cmd {
+       const char *short_option;
+       const char *long_option;
+       const char *arg_name;
+       const char *doc;
+
+       const char *value;
+};
+
+#define cmdline_foreach(ITER, CMDS) \
+       for (const struct cmdline_cmd *ITER = (CMDS); \
+            ITER->short_option || ITER->long_option || ITER->arg_name; \
+            ITER++)
+
+int cmdline_doc_str_buf(char *buf, size_t buflen, const struct cmdline_cmd 
*cmds)
+{
+       struct osmo_strbuf sb = { .buf = buf, .len = buflen };
+       /* First find the longest options part */
+       int w = 0;
+       cmdline_foreach(cmd, cmds) {
+               int cmd_w = 0;
+               if (cmd->short_option)
+                       cmd_w += 2 + strlen(cmd->short_option);
+               if (cmd->long_option)
+                       cmd_w += 3 + strlen(cmd->long_option);
+               if (cmd->arg_name)
+                       cmd_w += 1 + strlen(cmd->arg_name);
+               w = OSMO_MAX(w, cmd_w);
+       }
+       /* vertical gap */
+       w += 2;
+
+       OSMO_STRBUF_PRINTF(sb, "Options:\n");
+       cmdline_foreach(cmd, cmds) {
+               char *line_start = sb.pos;
+               if (cmd->short_option)
+                       OSMO_STRBUF_PRINTF(sb, " -%s", cmd->short_option);
+               if (cmd->long_option)
+                       OSMO_STRBUF_PRINTF(sb, " --%s", cmd->long_option);
+               if (cmd->arg_name)
+                       OSMO_STRBUF_PRINTF(sb, " %s", cmd->arg_name);
+               if (cmd->doc) {
+                       int have = sb.pos - line_start;
+                       int spaces = OSMO_MAX(1, w - have);
+                       OSMO_STRBUF_PRINTF(sb, "%*s", spaces, "");
+                       OSMO_STRBUF_PRINTF(sb, "%s", cmd->doc);
+               }
+               OSMO_STRBUF_PRINTF(sb, "\n");
+       }
+       return sb.chars_needed;
+}
+
+void cmdline_print_help(const struct cmdline_cmd *cmds)
+{
+       char buf[8192];
+       cmdline_doc_str_buf(buf, sizeof(buf), cmds);
+       printf("%s", buf);
+}
+
+void cmdline_cmd_store_optarg(struct cmdline_cmd *cmd)
+{
+       if (cmd->arg_name)
+               cmd->value = optarg;
+       else
+               cmd->value = (cmd->short_option ? : cmd->long_option);
+}
+
+int cmdline_read(struct cmdline_cmd *cmds, int argc, char **argv)
+{
+       char short_options[256] = {};
+       struct option long_options[128] = {};
+       int long_options_i = 0;
+       int long_option_val = 0;
+       struct osmo_strbuf short_sb = { .buf = short_options, .len = 
sizeof(short_options) };
+
+       cmdline_foreach(cmd, cmds) {
+               if (cmd->short_option) {
+                       OSMO_STRBUF_PRINTF(short_sb, "%s", cmd->short_option);
+                       if (cmd->arg_name)
+                               OSMO_STRBUF_PRINTF(short_sb, ":");
+               }
+               if (cmd->long_option) {
+                       long_options[long_options_i] = (struct option){
+                               cmd->long_option,
+                               cmd->arg_name ? 1 : 0,
+                               &long_option_val,
+                               long_options_i,
+                       };
+                       long_options_i++;
+               }
+       }
+
+       while (1) {
+               int option_index = 0;
+               char c = getopt_long(argc, argv, short_options, long_options, 
&option_index);
+               if (c == -1)
+                       break;
+               if (c == 0) {
+                       struct cmdline_cmd *long_cmd = &cmds[long_option_val];
+                       cmdline_cmd_store_optarg(long_cmd);
+               } else {
+                       bool found = false;
+                       cmdline_foreach(cc, cmds) {
+                               if (strchr(cc->short_option, c)) {
+                                       cmdline_cmd_store_optarg((struct 
cmdline_cmd *)cc);
+                                       found = true;
+                                       break;
+                               }
+                       }
+                       if (!found) {
+                               fprintf(stderr, "%s: Error in command line 
options. Exiting.\n", argv[0]);
+                               return -1;
+                       }
+               }
+       }
+
+       /* positional args */
+       cmdline_foreach(cmd, cmds) {
+               if (optind >= argc)
+                       break;
+               if (cmd->short_option || cmd->long_option)
+                       continue;
+               if (!cmd->arg_name)
+                       continue;
+               ((struct cmdline_cmd *)cmd)->value = argv[optind];
+               optind++;
+       }
+
+       if (optind < argc) {
+               cmdline_print_help(cmds);
+               fprintf(stderr, "%s: Unsupported positional argument on command 
line\n", argv[optind]);
+               return -1;
+       }
+       return 0;
+}
+
+const char *cmdline_get(const struct cmdline_cmd *cmds, const char 
*option_name, const char *default_val)
+{
+       cmdline_foreach(cmd, cmds) {
+               if (cmd->long_option && !strcmp(cmd->long_option, option_name))
+                       return cmd->value;
+               if (cmd->short_option && !strcmp(cmd->short_option, 
option_name))
+                       return cmd->value;
+               if (cmd->arg_name && !strcmp(cmd->arg_name, option_name))
+                       return cmd->value;
+       }
+       return default_val;
+}
+
+bool cmdline_get_int(int *dst, int minval, int maxval, int default_val,
+                    const struct cmdline_cmd *cmds, const char *option_name)
+{
+       const char *str = cmdline_get(cmds, option_name, NULL);
+       if (!str) {
+               *dst = default_val;
+               return true;
+       }
+       if (osmo_str_to_int(dst, str, 10, minval, maxval)) {
+               cmdline_print_help(cmds);
+               printf("ERROR: invalid integer number: %s\n", str);
+               return false;
+       }
+       if (*dst < minval || *dst > maxval) {
+               cmdline_print_help(cmds);
+               printf("ERROR: number out of range: %d <= %d <= %d\n", minval, 
*dst, maxval);
+               return false;
+       }
+       return true;
+}
+
+/* returns number of configured CPUs in the system, or negative otherwise */
+static int get_num_cpus(void)
+{
+       static unsigned int num_cpus = 0;
+       long ln;
+
+       if (num_cpus)
+               return num_cpus;
+
+       /* This is expensive (goes across /sys, so let's do it only once. It is
+        * guaranteed it won't change during process span anyway). */
+       ln = sysconf(_SC_NPROCESSORS_CONF);
+       if (ln < 0) {
+               fprintf(stderr, "sysconf(_SC_NPROCESSORS_CONF) failed: %s\n", 
strerror(errno));
+               return -1;
+       }
+       num_cpus = (unsigned int) ln;
+       return num_cpus;
+}
+
+struct udp_port {
+       struct llist_head entry;
+
+       /* IP address and UDP port from user input */
+       struct osmo_sockaddr osa;
+
+       /* locally bound socket */
+       int fd;
+};
+
+enum data_io_type {
+       IO_UNUSED = 0,
+       IO_RECV,
+       IO_SEND,
+};
+
+struct data_io {
+       enum data_io_type type;
+       struct osmo_sockaddr osa;
+       struct iovec iov;
+       struct msghdr msgh;
+       uint8_t *data;
+       size_t data_size;
+       uint8_t *control;
+       size_t control_size;
+       int n;
+};
+
+struct io_queue {
+       size_t d_size;
+       struct data_io d[0];
+};
+
+struct counter {
+       uint64_t count;
+       uint64_t last;
+};
+
+struct traffic_counter {
+       struct counter packets;
+       struct counter bytes;
+};
+
+uint64_t counter_get(struct counter *c)
+{
+       uint64_t cnt = c->count;
+       uint64_t val = cnt - c->last;
+       c->last = cnt;
+       return val;
+}
+
+struct traffic_counter g_rx = {};
+struct traffic_counter g_tx = {};
+
+struct worker {
+       int id;
+       struct io_queue *q;
+       struct io_uring ring;
+       pthread_t worker;
+       struct {
+               unsigned long long rx_packets;
+               unsigned long long rx_bytes;
+               unsigned long long tx_packets;
+               unsigned long long tx_bytes;
+       } ctr;
+};
+
+struct {
+       int port_nr;
+       const char *local_addr;
+       int queue_size;
+       int buf_size;
+       int control_size;
+       int response_size;
+       int response_n;
+       int workers_n;
+       struct udp_port port;
+       bool cpu_affinity;
+} cfg = {};
+
+static void data_io_prepare(struct worker *w, struct data_io *d)
+{
+       void *data_buf = talloc_size(w->q, cfg.buf_size);
+       void *control_buf = talloc_size(w->q, cfg.control_size);
+       *d = (struct data_io){
+               .type = IO_RECV,
+               .iov = {
+                       .iov_base = data_buf,
+                       .iov_len = cfg.buf_size,
+               },
+               .msgh = {
+                       .msg_name = &d->osa,
+                       .msg_namelen = sizeof(d->osa),
+                       .msg_iov = &d->iov,
+                       .msg_iovlen = 1,
+                       .msg_control = control_buf,
+                       .msg_controllen = cfg.control_size,
+               },
+               .data = data_buf,
+               .data_size = cfg.buf_size,
+               .control = control_buf,
+               .control_size = cfg.control_size,
+       };
+}
+
+static void data_io_prep_recv(struct io_uring *ring, struct udp_port *port, 
struct data_io *d)
+{
+       struct io_uring_sqe *sqe;
+       d->type = IO_RECV;
+       d->iov.iov_len = d->data_size;
+       d->msgh.msg_controllen = d->control_size;
+       sqe = io_uring_get_sqe(ring);
+       OSMO_ASSERT(sqe);
+       io_uring_prep_recvmsg(sqe, port->fd, &d->msgh, 0);
+       io_uring_sqe_set_data(sqe, d);
+}
+
+static void data_io_prep_send(struct io_uring *ring, struct udp_port *port, 
struct data_io *d)
+{
+       struct io_uring_sqe *sqe;
+       d->type = IO_SEND;
+       sqe = io_uring_get_sqe(ring);
+       OSMO_ASSERT(sqe);
+       io_uring_prep_sendmsg(sqe, port->fd, &d->msgh, 0);
+       io_uring_sqe_set_data(sqe, d);
+}
+
+static bool get_payload_info(struct gtp_flood_payload_info *dst, struct 
data_io *d)
+{
+       uint8_t *pi;
+       uint8_t *len;
+       size_t copy_len;
+
+       len = d->iov.iov_base + d->iov.iov_len - 1;
+       if ((*len) > d->iov.iov_len)
+               return false;
+       pi = len - (*len);
+       if (strncmp((void *)pi, "info", 4))
+               return false;
+       copy_len = OSMO_MIN(sizeof(*dst), *len);
+       *dst = (struct gtp_flood_payload_info){};
+       memcpy((void *)dst, pi, copy_len);
+       return true;
+}
+
+static void data_io_handle_completion(struct worker *w, struct udp_port *port, 
struct io_uring_cqe *cqe,
+                                     int response_size, int response_n)
+{
+       struct data_io *d;
+       struct osmo_sockaddr *osa = NULL;
+       int rc;
+       struct gtp_flood_payload_info pi;
+       struct io_uring *ring = &w->ring;
+
+       d = io_uring_cqe_get_data(cqe);
+
+       osa = &d->osa;
+       rc = cqe->res;
+
+       switch (d->type) {
+       case IO_RECV:
+               /* done reading */
+               if (OSMO_UNLIKELY(!started)) {
+                       started = true;
+                       OSMO_ASSERT(clock_gettime(CLOCK_MONOTONIC, &ts_start) 
== 0);
+               }
+               if (rc <= 0) {
+                       LOGP(DLGLOBAL, LOGL_ERROR, "%s -> rx error rc=%d 
flags=0x%x\n",
+                       osa ? osmo_sockaddr_to_str(osa) : "NULL",
+                       rc, cqe->flags);
+                       return;
+               }
+#if 0
+               LOGP(DLGLOBAL, LOGL_DEBUG, "%s -> rx rc=%d flags=0x%x: %s\n",
+                    osa ? osmo_sockaddr_to_str(osa) : "NULL",
+                    rc, cqe->flags,
+                    osmo_quote_str(d->iov.iov_base, rc));
+#endif
+               io_uring_cqe_seen(ring, cqe);
+               w->ctr.rx_packets++;
+               w->ctr.rx_bytes += rc;
+
+               if (response_n < 1) {
+                       data_io_prep_recv(ring, port, d);
+                       break;
+               }
+
+               d->iov.iov_len = rc;
+               if (get_payload_info(&pi, d)) {
+                       /* set the return TEID */
+                       struct gtp1u_hdr *gtp_hdr = (void *)d->iov.iov_base;
+                       gtp_hdr->tei = pi.return_teid;
+               }
+
+               /* resubmit back to sender */
+
+               /* adjust size? */
+               if (response_size > 0)
+                       d->iov.iov_len = response_size;
+
+               data_io_prep_send(ring, port, d);
+               break;
+
+       case IO_SEND:
+               /* done writing. */
+               if (rc <= 0) {
+                       LOGP(DLGLOBAL, LOGL_ERROR, "%s -> tx error rc=%d 
flags=0x%x\n",
+                       osa ? osmo_sockaddr_to_str(osa) : "NULL",
+                       rc, cqe->flags);
+                       return;
+               }
+#if 0
+               LOGP(DLGLOBAL, LOGL_DEBUG, "%s <- tx rc=%d flags=0x%x: %s\n",
+                    osa ? osmo_sockaddr_to_str(osa) : "NULL",
+                    rc, cqe->flags,
+                    osmo_quote_str(d->iov.iov_base, rc));
+#endif
+               io_uring_cqe_seen(ring, cqe);
+               w->ctr.tx_packets++;
+               w->ctr.tx_bytes += rc;
+
+               d->n++;
+
+               /* Send again? If not, re-submit open slot for reading. */
+               if (d->n < response_n)
+                       data_io_prep_send(ring, port, d);
+               else
+                       data_io_prep_recv(ring, port, d);
+               break;
+
+       default:
+               OSMO_ASSERT(0);
+       }
+}
+
+struct cmdline_cmd cmds[] = {
+       {
+               .short_option = "h",
+               .long_option = "help",
+               .doc = "Show this help",
+       },
+       {
+               .short_option = "l",
+               .long_option = "local-addr",
+               .arg_name = "IP-ADDR",
+               .doc = "Listen on local IP address (default is 0.0.0.0).",
+       },
+       {
+               .short_option = "p",
+               .long_option = "port",
+               .arg_name = "UDP-PORT",
+               .doc = "Listen on local UDP port.",
+       },
+       /*
+       {
+               .short_option = "P",
+               .long_option = "port-range-to",
+               .arg_name = "UDP-PORT-TO",
+               .doc = "Listen on a range of ports, from --port to 
--port-range-to, inclusive.",
+       },
+       */
+       {
+               .short_option = "s",
+               .long_option = "response-size",
+               .arg_name = "BYTES",
+               .doc = "When responding, enlarge or shorten the payload to this 
size.",
+       },
+       {
+               .short_option = "n",
+               .long_option = "response-repeat",
+               .arg_name = "N",
+               .doc = "Respond N times, i.e. multiply the returned traffic.",
+       },
+       {
+               .short_option = "a",
+               .long_option = "cpu-affinity",
+               .doc = "Pin each Nth worker to a Nth cpu.",
+       },
+       {
+               .long_option = "io-uring-queue",
+               .arg_name = "SIZE",
+               .doc = "I/O tuning: queue size to use for io_uring, default is 
4000.",
+       },
+       {
+               .long_option = "io-uring-buf",
+               .arg_name = "SIZE",
+               .doc = "I/O tuning: maximum payload size, default is 1452.",
+       },
+       {
+               .long_option = "workers",
+               .arg_name = "N",
+               .doc = "Number of rx threads to run",
+       },
+       {}
+};
+
+static const struct log_info_cat categories[] = {
+};
+
+const struct log_info udp_responder_log_info = {
+       .cat = categories,
+       .num_cat = ARRAY_SIZE(categories),
+};
+
+static void start_rx_worker(struct worker *w);
+
+int main(int argc, char **argv)
+{
+       struct osmo_sockaddr_str addr = {};
+       struct osmo_sockaddr osa = {};
+       int val;
+       int rc;
+
+       osmo_init_logging2(OTC_GLOBAL, &udp_responder_log_info);
+       log_set_log_level(osmo_stderr_target, LOGL_ERROR);
+
+       if (cmdline_read(cmds, argc, argv)
+           || cmdline_get(cmds, "help", NULL)) {
+               cmdline_print_help(cmds);
+               return -1;
+       }
+
+       if (!cmdline_get_int(&cfg.port_nr, 1, 65535, 23000, cmds, "port"))
+               return -1;
+
+       cfg.local_addr = cmdline_get(cmds, "local-addr", "0.0.0.0");
+       if (osmo_sockaddr_str_from_str(&addr, cfg.local_addr, cfg.port_nr)
+           || osmo_sockaddr_str_to_osa(&addr, &osa)) {
+               printf("ERROR: invalid interface or port number: %s:%d\n", 
cfg.local_addr, cfg.port_nr);
+               return -1;
+       }
+
+       if (!cmdline_get_int(&cfg.queue_size, 1, 65535, 4000, cmds, 
"io-uring-queue"))
+               return -1;
+       if (!cmdline_get_int(&cfg.buf_size, 1, 65535, 1452, cmds, 
"io-uring-buf"))
+               return -1;
+
+       if (!cmdline_get_int(&cfg.response_size, 0, cfg.buf_size, 0, cmds, 
"response-size"))
+               return -1;
+
+       if (!cmdline_get_int(&cfg.response_n, 0, INT_MAX, 1, cmds, 
"response-repeat"))
+               return -1;
+
+       if (!cmdline_get_int(&cfg.workers_n, 1, INT_MAX, 4, cmds, "workers"))
+               return -1;
+
+       if (cmdline_get(cmds, "cpu-affinity", NULL))
+               cfg.cpu_affinity = true;
+
+       cfg.port.osa = osa;
+       cfg.control_size = 1024;
+
+       /* create and bind socket */
+       rc = osmo_sock_init_osa(SOCK_DGRAM, IPPROTO_UDP, &cfg.port.osa, NULL, 
OSMO_SOCK_F_BIND);
+       /* (logging of errors already happens in osmo_sock_init_osa() */
+       if (rc < 0)
+               return -1;
+       cfg.port.fd = rc;
+       LOGP(DLGLOBAL, LOGL_NOTICE, "bound UDP %s fd=%d\n", 
osmo_sock_get_name2(cfg.port.fd), cfg.port.fd);
+
+       /* Set Don't Fragment (DF) bit on IP packets transmitted by socket: */
+       val = IP_PMTUDISC_DO;
+       rc = setsockopt(cfg.port.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, 
sizeof(val));
+       if (rc == -1) {
+               fprintf(stderr, "ERROR: setsockopt(IPPROTO_IP, IP_DONTFRAG) 
failed errno=%d\n", errno);
+               return -1;
+       }
+
+       val = 1;
+       rc = setsockopt(cfg.port.fd, IPPROTO_IP, IP_PKTINFO, &val, sizeof(val));
+       /* TODO: IPv6: setsockopt(s, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, 
sizeof(one)); */
+       if (rc == -1) {
+               fprintf(stderr, "ERROR: setsockopt(IPPROTO_IP, IP_PKTINFO) 
failed errno=%d\n", errno);
+               return -1;
+       }
+
+       struct worker *workers = talloc_zero_array(OTC_GLOBAL, struct worker, 
cfg.workers_n);
+       for (int i = 0; i < cfg.workers_n; i++) {
+               printf("Starting worker %d\n", i);
+               workers[i].id = i;
+               start_rx_worker(&workers[i]);
+       }
+
+       /* periodically log rx stats */
+       while (1) {
+               static struct timespec last_info_log = {.tv_sec = 0, .tv_nsec = 
0};
+               struct timespec ts_now;
+
+               if (OSMO_UNLIKELY(!started))
+                       continue;
+
+               clock_gettime(CLOCK_MONOTONIC, &ts_now);
+               /* the resolution is in seconds, output stats once per second. 
*/
+               if (OSMO_UNLIKELY(ts_now.tv_sec != last_info_log.tv_sec)) {
+                       struct timespec ts_elapsed, ts_diff;
+                       unsigned long long elapsed_usec, diff_usec;
+                       uint64_t diff_rx_packets, diff_rx_bytes, 
diff_tx_packets, diff_tx_bytes;
+                       uint64_t elapsed_rx_packets = 0, elapsed_rx_bytes = 0, 
elapsed_tx_packets = 0, elapsed_tx_bytes = 0;
+
+                       timespecsub(&ts_now, &ts_start, &ts_elapsed);
+                       timespecsub(&ts_now, &last_info_log, &ts_diff);
+                       elapsed_usec = (ts_elapsed.tv_sec * 1000 * 1000) + 
(ts_elapsed.tv_nsec / 1000);
+                       diff_usec = (ts_diff.tv_sec * 1000 * 1000) + 
(ts_diff.tv_nsec / 1000);
+                       if (elapsed_usec == 0)
+                               elapsed_usec = 1;
+                       if (diff_usec == 0)
+                               diff_usec = 1;
+                       last_info_log = ts_now;
+
+                       for (int i = 0; i < cfg.workers_n; i++) {
+                               elapsed_rx_packets += workers[i].ctr.rx_packets;
+                               elapsed_rx_bytes += workers[i].ctr.rx_bytes;
+                               elapsed_tx_packets += workers[i].ctr.tx_packets;
+                               elapsed_tx_bytes += workers[i].ctr.tx_bytes;
+                       }
+                       g_rx.packets.count = elapsed_rx_packets;
+                       g_rx.bytes.count = elapsed_rx_bytes;
+                       g_tx.packets.count = elapsed_tx_packets;
+                       g_tx.bytes.count = elapsed_tx_bytes;
+
+                       diff_rx_packets = counter_get(&g_rx.packets);
+                       diff_rx_bytes = counter_get(&g_rx.bytes);
+                       diff_tx_packets = counter_get(&g_tx.packets);
+                       diff_tx_bytes = counter_get(&g_tx.bytes);
+
+                       if (diff_rx_packets || diff_tx_packets) {
+                               printf("DIFF:  %12llu Rx: %8"PRIu64" packets 
(%6llu kPPS, %8llu Mbps) | Tx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n",
+                                      diff_usec,
+                                      diff_rx_packets,
+                                      diff_rx_packets * 1000 / diff_usec,
+                                      diff_rx_bytes * 8 / diff_usec,
+                                      diff_tx_packets,
+                                      diff_tx_packets * 1000 / diff_usec,
+                                      diff_tx_bytes * 8 / diff_usec);
+                               printf("TOTAL: %12llu Rx: %8"PRIu64" packets 
(%6llu kPPS, %8llu Mbps) | Tx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n",
+                                      elapsed_usec,
+                                      elapsed_rx_packets,
+                                      elapsed_rx_packets * 1000 / elapsed_usec,
+                                      elapsed_rx_bytes * 8 / elapsed_usec,
+                                      elapsed_tx_packets,
+                                      elapsed_tx_packets * 1000 / elapsed_usec,
+                                      elapsed_tx_bytes * 8 / elapsed_usec);
+                               fflush(stdout);
+                       }
+               }
+               int rc = usleep(500 * 1000);
+               if (rc == -1)
+                       fprintf(stderr, "ERROR: usleep() failed errno=%d\n", 
errno);
+       }
+}
+
+static void *rx_worker_func(void *_worker);
+
+static void start_rx_worker(struct worker *w)
+{
+       w->q = talloc_zero_size(OTC_GLOBAL, sizeof(struct io_queue) + 
cfg.queue_size * sizeof(struct data_io));
+       OSMO_ASSERT(w->q);
+       *w->q = (struct io_queue){
+               .d_size = cfg.queue_size,
+       };
+       for (int i = 0; i < w->q->d_size; i++) {
+               struct data_io *d = &w->q->d[i];
+               data_io_prepare(w, d);
+       }
+       int rc = pthread_create(&w->worker, NULL, rx_worker_func, w);
+       OSMO_ASSERT(rc >= 0);
+}
+
+static void *rx_worker_func(void *_worker)
+{
+       struct worker *w = _worker;
+       struct io_queue *q = w->q;
+       char thread_name[32];
+       pthread_t phtread_id = pthread_self();
+
+       if (cfg.cpu_affinity) {
+               cpu_set_t *cpuset;
+               size_t cpuset_size;
+               int num_cpus = get_num_cpus();
+               int core_id = w->id % num_cpus;
+
+               cpuset = CPU_ALLOC(num_cpus);
+               cpuset_size = CPU_ALLOC_SIZE(num_cpus);
+               CPU_ZERO_S(cpuset_size, cpuset);
+               CPU_SET_S(core_id, cpuset_size, cpuset);
+
+               fprintf(stderr, "Pinning worker %d to CPU %d\n", w->id, 
core_id);
+               int rc = pthread_setaffinity_np(pthread_self(), cpuset_size, 
cpuset) != 0;
+               if (rc != 0) {
+                       fprintf(stderr, "ERROR: Pinning worker %d to CPU %d: 
error=%d\n", w->id, core_id, rc);
+                       exit(1);
+               }
+               CPU_FREE(cpuset);
+       }
+       snprintf(thread_name, sizeof(thread_name), "UdpRspWrk%u", w->id);
+       if (pthread_setname_np(phtread_id, thread_name) != 0) {
+               char buf[256];
+               int err = errno;
+               char *err_str = strerror_r(err, buf, sizeof(buf));
+               fprintf(stderr, "worker %u: failed setting thread name: %s\n",
+                       w->id, err_str);
+       }
+
+       int rc = io_uring_queue_init(q->d_size, &w->ring, 0);
+       OSMO_ASSERT(rc >= 0);
+       unsigned int vals[2] = {1, 1};
+       rc = io_uring_register_iowq_max_workers(&w->ring, &vals[0]);
+       OSMO_ASSERT(rc == 0);
+
+       for (int i = 0; i < q->d_size; i++) {
+               struct data_io *d = &q->d[i];
+               /* fill once with random printable data */
+               for (int j = 0; j < d->data_size; j++)
+                       d->data[j] = 32 + random() % (126 - 32 + 1);
+       }
+
+       /* fill the queue to start receiving */
+       for (int i = 0; i < q->d_size; i++)
+               data_io_prep_recv(&w->ring, &cfg.port, &q->d[i]);
+
+       struct __kernel_timespec ts_zero = {};
+       struct __kernel_timespec ts_1s = { .tv_sec = 1 };
+
+       while (1) {
+               uint32_t submitted;
+               uint32_t completed = 0;
+               struct io_uring_cqe *cqe;
+
+               /* submit any requests from previous loop */
+               submitted = io_uring_submit(&w->ring);
+
+               /* process all pending completions */
+               while (io_uring_wait_cqe_timeout(&w->ring, &cqe, &ts_zero) == 
0) {
+                       data_io_handle_completion(w, &cfg.port, cqe, 
cfg.response_size, cfg.response_n);
+                       completed++;
+               }
+
+               /* Wait a bit longer */
+               if (!submitted && !completed) {
+                       if (io_uring_wait_cqe_timeout(&w->ring, &cqe, &ts_1s) 
== 0) {
+                               data_io_handle_completion(w, &cfg.port, cqe, 
cfg.response_size, cfg.response_n);
+                               completed++;
+                       }
+               }
+       }
+
+       talloc_free(q);
+       return 0;
+}
+
+#endif /* HAVE_URING */
+

--
To view, visit https://gerrit.osmocom.org/c/upf-benchmark/+/38328?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: upf-benchmark
Gerrit-Branch: master
Gerrit-Change-Id: I9edc2e3e1e41767673bcf96f1fe97fa4bf6d60f7
Gerrit-Change-Number: 38328
Gerrit-PatchSet: 2
Gerrit-Owner: pespin <[email protected]>
Gerrit-Reviewer: laforge <[email protected]>
Gerrit-Reviewer: neels <[email protected]>

Reply via email to