laforge has submitted this change. ( 
https://gerrit.osmocom.org/c/osmo-mgw/+/36363?usp=email )

Change subject: Convert RTP/RTCP/OSMUX I/O from osmo_fd to osmo_io
......................................................................

Convert RTP/RTCP/OSMUX I/O from osmo_fd to osmo_io

Converting from osmo_fd to osmo_io allows us to switch to the new
io_uring backend and benefit from related performance benefits.

In a benchmark running 200 concurrent bi-directional voice calls with
GSM-EFR codec, I am observing:

* the code before this patch uses 40..42% of a single core on a
  Ryzen 5950X at 200 calls (=> 200 endpoints with each two connections)

* no increase in CPU utilization before/after this patch, i.e. the
  osmo_io overhead for the osmo_fd backend is insignificant compared
  to the direct osmo_fd mode before

* an almost exactly 50% reduction of CPU utilization when running the
  same osmo-mgw build with LIBOSMO_IO_BACKEND=IO_URING - top shows
  19..21% for the same workload instead of 40..42% with the OSMO_FD
  default backend.

* An increase of about 4 Megabytes in both RSS and VIRT size when
  enabling the OSMO_IO backend.  This is likely the memory-mapped rings.

No memory leakage is observed when using either of the backends.

Change-Id: I8471960d5d8088a70cf105f2f40dfa5d5458169a
---
M include/osmocom/mgcp/mgcp.h
M include/osmocom/mgcp/mgcp_network.h
M src/libosmo-mgcp/mgcp_conn.c
M src/libosmo-mgcp/mgcp_iuup.c
M src/libosmo-mgcp/mgcp_network.c
M src/libosmo-mgcp/mgcp_osmux.c
M tests/mgcp/mgcp_test.c
7 files changed, 236 insertions(+), 169 deletions(-)

Approvals:
  Jenkins Builder: Verified
  laforge: Looks good to me, approved




diff --git a/include/osmocom/mgcp/mgcp.h b/include/osmocom/mgcp/mgcp.h
index e61ba89..4dff4d0 100644
--- a/include/osmocom/mgcp/mgcp.h
+++ b/include/osmocom/mgcp/mgcp.h
@@ -24,6 +24,7 @@

 #include <osmocom/core/msgb.h>
 #include <osmocom/core/socket.h>
+#include <osmocom/core/osmo_io.h>
 #include <osmocom/core/write_queue.h>
 #include <osmocom/core/timer.h>
 #include <osmocom/core/logging.h>
@@ -205,4 +206,4 @@


 int mgcp_create_bind(const char *source_addr, int port, uint8_t dscp, uint8_t 
prio);
-int mgcp_udp_send(int fd, const struct osmo_sockaddr *addr, const char *buf, 
int len);
+int mgcp_udp_send(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, 
const char *buf, int len);
diff --git a/include/osmocom/mgcp/mgcp_network.h 
b/include/osmocom/mgcp/mgcp_network.h
index 1ec8979..8f6505c 100644
--- a/include/osmocom/mgcp/mgcp_network.h
+++ b/include/osmocom/mgcp/mgcp_network.h
@@ -4,6 +4,7 @@
 #include <stdbool.h>

 #include <osmocom/core/socket.h>
+#include <osmocom/core/osmo_io.h>

 #include <osmocom/mgcp/mgcp.h>

@@ -120,8 +121,8 @@
        bool rfc5993_hr_convert;

        /* Each end has a separate socket for RTP and RTCP */
-       struct osmo_fd rtp;
-       struct osmo_fd rtcp;
+       struct osmo_io_fd *rtp;
+       struct osmo_io_fd *rtcp;

        /* local UDP port number of the RTP socket; RTCP is +1 */
        int local_port;
@@ -179,7 +180,7 @@
                                 int id, int inc);
 void rtpconn_rate_ctr_inc(struct mgcp_conn_rtp *conn_rtp, struct mgcp_endpoint 
*endp,
                                 int id);
-void forward_data_tap(int fd, struct mgcp_rtp_tap *tap, struct msgb *msg);
+void forward_data_tap(struct osmo_io_fd *iofd, struct mgcp_rtp_tap *tap, 
struct msgb *msg);
 uint32_t mgcp_get_current_ts(unsigned codec_rate);

 int amr_oa_bwe_convert(struct mgcp_endpoint *endp, struct msgb *msg, bool 
target_is_oa);
diff --git a/src/libosmo-mgcp/mgcp_conn.c b/src/libosmo-mgcp/mgcp_conn.c
index d9bc573..5eb4897 100644
--- a/src/libosmo-mgcp/mgcp_conn.c
+++ b/src/libosmo-mgcp/mgcp_conn.c
@@ -106,8 +106,8 @@
        /* backpointer to the generic part of the connection */
        conn->u.rtp.conn = conn;

-       end->rtp.fd = -1;
-       end->rtcp.fd = -1;
+       end->rtp = NULL;
+       end->rtcp = NULL;
        memset(&end->addr, 0, sizeof(end->addr));
        end->rtcp_port = 0;

diff --git a/src/libosmo-mgcp/mgcp_iuup.c b/src/libosmo-mgcp/mgcp_iuup.c
index 3818d3e..7531e42 100644
--- a/src/libosmo-mgcp/mgcp_iuup.c
+++ b/src/libosmo-mgcp/mgcp_iuup.c
@@ -512,10 +512,9 @@
                 osmo_sockaddr_port(&rtp_end->addr.u.sa), 
ntohs(rtp_end->rtcp_port));

        /* Forward a copy of the RTP data to a debug ip/port */
-       forward_data_tap(rtp_end->rtp.fd, &conn_src->tap_out,
-                    msg);
+       forward_data_tap(rtp_end->rtp, &conn_src->tap_out, msg);

-       len = mgcp_udp_send(rtp_end->rtp.fd, &rtp_end->addr, (char *)hdr, 
buflen);
+       len = mgcp_udp_send(rtp_end->rtp, &rtp_end->addr, (char *)hdr, buflen);

        if (len <= 0)
                return len;
diff --git a/src/libosmo-mgcp/mgcp_network.c b/src/libosmo-mgcp/mgcp_network.c
index bdf516e..1fc2c56 100644
--- a/src/libosmo-mgcp/mgcp_network.c
+++ b/src/libosmo-mgcp/mgcp_network.c
@@ -4,6 +4,7 @@
 /*
  * (C) 2009-2012 by Holger Hans Peter Freyther <[email protected]>
  * (C) 2009-2012 by On-Waves
+ * (C) 2013-2024 by sysmocom - s.f.m.c. GmbH
  * All Rights Reserved
  *
  * This program is free software; you can redistribute it and/or modify
@@ -794,16 +795,18 @@

 /* Forward data to a debug tap. This is debug function that is intended for
  * debugging the voice traffic with tools like gstreamer */
-void forward_data_tap(int fd, struct mgcp_rtp_tap *tap, struct msgb *msg)
+void forward_data_tap(struct osmo_io_fd *iofd, struct mgcp_rtp_tap *tap, 
struct msgb *msg)
 {
        int rc;

        if (!tap->enabled)
                return;

-       rc = sendto(fd, msgb_data(msg), msgb_length(msg), 0, (struct sockaddr 
*)&tap->forward,
-                   sizeof(tap->forward));
+       struct msgb *msg2 = msgb_copy(msg, "RTP TAP Tx");
+       if (!msg2)
+               return;

+       rc = osmo_iofd_sendto_msgb(iofd, msg2, 0, &tap->forward);
        if (rc < 0)
                LOGP(DRTP, LOGL_ERROR,
                     "Forwarding tapped (debug) voice data failed.\n");
@@ -1039,29 +1042,36 @@
        return -1;
 }

-/*! send udp packet.
- *  \param[in] fd associated file descriptor.
+/*! send message buffer via udp socket.  If it succeeds, it takes ownership of 
the msgb and internally calls
+ *  msgb_free() after the aynchronous sendto() completes.  In case of error, 
the msgb is still owned by the
+ *  caller and must be free'd accordingly.
+ *  \param[in] iofd associated file descriptor.
+ *  \param[in] addr destination ip-address.
+ *  \param[in] msg message buffer that holds the data to be send.
+ *  \returns 0 in case of success (takes msgb ownership), -1 on error (doesn't 
take msgb ownership). */
+static int mgcp_udp_send_msg(struct osmo_io_fd *iofd, const struct 
osmo_sockaddr *addr, struct msgb *msg)
+{
+       LOGP(DRTP, LOGL_DEBUG, "sending %d bytes length packet to %s ...\n", 
msgb_length(msg),
+            osmo_sockaddr_to_str(addr));
+
+       return osmo_iofd_sendto_msgb(iofd, msg, 0, addr);
+}
+
+/*! send udp packet from raw buffer/length.
+ *  \param[in] iofd associated file descriptor.
  *  \param[in] addr destination ip-address.
  *  \param[in] buf buffer that holds the data to be send.
  *  \param[in] len length of the data to be sent.
  *  \returns bytes sent, -1 on error. */
-int mgcp_udp_send(int fd, const struct osmo_sockaddr *addr, const char *buf, 
int len)
+int mgcp_udp_send(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, 
const char *buf, int len)
 {
-       char ipbuf[INET6_ADDRSTRLEN];
-       size_t addr_len;
+       struct msgb *msg = msgb_alloc_c(iofd, len, "mgcp_udp_send");
+       if (!msg)
+               return -ENOMEM;
+       memcpy(msg->tail, buf, len);
+       msgb_put(msg, len);

-       LOGP(DRTP, LOGL_DEBUG,
-            "sending %d bytes length packet to %s:%u ...\n", len,
-            osmo_sockaddr_ntop(&addr->u.sa, ipbuf),
-            osmo_sockaddr_port(&addr->u.sa));
-
-       if (addr->u.sa.sa_family == AF_INET6) {
-               addr_len = sizeof(addr->u.sin6);
-       } else {
-               addr_len = sizeof(addr->u.sin);
-       }
-
-       return sendto(fd, buf, len, 0, &addr->u.sa, addr_len);
+       return mgcp_udp_send_msg(iofd, addr, msg);
 }

 /*! send RTP dummy packet (to keep NAT connection open).
@@ -1089,8 +1099,7 @@
        if (mgcp_conn_rtp_is_iuup(conn))
                rc = mgcp_conn_iuup_send_dummy(conn);
        else
-               rc = mgcp_udp_send(conn->end.rtp.fd, &conn->end.addr,
-                                  rtp_dummy_payload, 
sizeof(rtp_dummy_payload));
+               rc = mgcp_udp_send(conn->end.rtp, &conn->end.addr, 
rtp_dummy_payload, sizeof(rtp_dummy_payload));

        if (rc == -1)
                goto failed;
@@ -1101,7 +1110,7 @@
        was_rtcp = 1;
        rtcp_addr = conn->end.addr;
        osmo_sockaddr_set_port(&rtcp_addr.u.sa, ntohs(conn->end.rtcp_port));
-       rc = mgcp_udp_send(conn->end.rtcp.fd, &rtcp_addr,
+       rc = mgcp_udp_send(conn->end.rtcp, &rtcp_addr,
                           rtp_dummy_payload, sizeof(rtp_dummy_payload));

        if (rc >= 0)
@@ -1225,22 +1234,21 @@
                        );

                /* Forward a copy of the RTP data to a debug ip/port */
-               forward_data_tap(rtp_end->rtp.fd, &conn_src->tap_out,
-                            msg);
+               forward_data_tap(rtp_end->rtp, &conn_src->tap_out, msg);

-               len = mgcp_udp_send(rtp_end->rtp.fd, &rtp_end->addr,
-                                   (char *)msgb_data(msg), msgb_length(msg));
-               if (len <= 0) {
+               len = msgb_length(msg);
+
+               rc = mgcp_udp_send_msg(rtp_end->rtp, &rtp_end->addr, msg);
+               if (rc < 0) {
                        msgb_free(msg);
-                       return len;
+                       return rc;
                }

                rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR);
                rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len);
                rtp_state->alt_rtp_tx_sequence++;

-               msgb_free(msg);
-               return len;
+               return 0;
        } else if (!trunk->omit_rtcp) {
                struct osmo_sockaddr rtcp_addr = rtp_end->addr;
                osmo_sockaddr_set_port(&rtcp_addr.u.sa, rtp_end->rtcp_port);
@@ -1251,15 +1259,19 @@
                         osmo_sockaddr_port(&rtcp_addr.u.sa)
                        );

-               len = mgcp_udp_send(rtp_end->rtcp.fd, &rtcp_addr,
-                                   (char *)msgb_data(msg), msgb_length(msg));
+               len = msgb_length(msg);
+
+               rc = mgcp_udp_send_msg(rtp_end->rtcp, &rtcp_addr, msg);
+               if (rc < 0) {
+                       msgb_free(msg);
+                       return rc;
+               }

                rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR);
                rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len);
                rtp_state->alt_rtp_tx_sequence++;

-               msgb_free(msg);
-               return len;
+               return 0;
        }

        msgb_free(msg);
@@ -1461,7 +1473,7 @@
 }

 /* Handle incoming RTP data from NET */
-static int rtp_data_net(struct osmo_fd *fd, unsigned int what)
+static void rtp_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb 
*msg, const struct osmo_sockaddr *saddr)
 {
        /* NOTE: This is a generic implementation. RTP data is received. In
         * case of loopback the data is just sent back to its origin. All
@@ -1472,49 +1484,34 @@

        struct mgcp_conn_rtp *conn_src;
        struct mgcp_endpoint *endp;
-       struct osmo_sockaddr addr;
-       socklen_t slen = sizeof(addr);
-       char ipbuf[INET6_ADDRSTRLEN];
-       int ret;
        enum rtp_proto proto;
        struct osmo_rtp_msg_ctx *mc;
-       struct msgb *msg;
-       int rc;

-       conn_src = (struct mgcp_conn_rtp *)fd->data;
+       conn_src = (struct mgcp_conn_rtp *) osmo_iofd_get_data(iofd);
        OSMO_ASSERT(conn_src);
        endp = conn_src->conn->endp;
        OSMO_ASSERT(endp);
-       msg = msgb_alloc_c(endp->trunk, RTP_BUF_SIZE, "RTP-rx");

-       proto = (fd == &conn_src->end.rtp) ? MGCP_PROTO_RTP : MGCP_PROTO_RTCP;
+       proto = (iofd == conn_src->end.rtp) ? MGCP_PROTO_RTP : MGCP_PROTO_RTCP;

-       ret = recvfrom(fd->fd, msgb_data(msg), msg->data_len, 0, (struct 
sockaddr *)&addr.u.sa, &slen);
-
-       if (ret <= 0) {
-               LOG_CONN_RTP(conn_src, LOGL_ERROR, "recvfrom error: %s\n", 
strerror(errno));
-               rc = -1;
-               goto out;
+       if (res <= 0) {
+               LOG_CONN_RTP(conn_src, LOGL_ERROR, "recvfrom error: %s\n", 
strerror(-res));
+               goto out_free;
        }

-       msgb_put(msg, ret);
-
-       LOG_CONN_RTP(conn_src, LOGL_DEBUG, "%s: rx %u bytes from %s:%u\n",
+       LOG_CONN_RTP(conn_src, LOGL_DEBUG, "%s: rx %u bytes from %s\n",
                     proto == MGCP_PROTO_RTP ? "RTP" : "RTCP",
-                    msgb_length(msg), osmo_sockaddr_ntop(&addr.u.sa, ipbuf),
-                    osmo_sockaddr_port(&addr.u.sa));
+                    msgb_length(msg), osmo_sockaddr_to_str(saddr));
 
        if ((proto == MGCP_PROTO_RTP && check_rtp(conn_src, msg))
            || (proto == MGCP_PROTO_RTCP && check_rtcp(conn_src, msg))) {
                /* Logging happened in the two check_ functions */
-               rc = -1;
-               goto out;
+               goto out_free;
        }

        if (mgcp_is_rtp_dummy_payload(msg)) {
                LOG_CONN_RTP(conn_src, LOGL_DEBUG, "rx dummy packet 
(dropped)\n");
-               rc = 0;
-               goto out;
+               goto out_free;
        }

        /* Since the msgb remains owned and freed by this function, the msg ctx 
data struct can just be on the stack and
@@ -1523,7 +1520,7 @@
        *mc = (struct osmo_rtp_msg_ctx){
                .proto = proto,
                .conn_src = conn_src,
-               .from_addr = &addr,
+               .from_addr = (struct osmo_sockaddr *) saddr,
        };
        LOG_CONN_RTP(conn_src, LOGL_DEBUG, "msg ctx: %d %p %s\n",
                     mc->proto, mc->conn_src,
@@ -1538,12 +1535,13 @@
        /* FIXME: count RTP and RTCP separately, also count IuUP payload-less 
separately */

        /* Forward a copy of the RTP data to a debug ip/port */
-       forward_data_tap(fd->fd, &conn_src->tap_in, msg);
+       forward_data_tap(iofd, &conn_src->tap_in, msg);

-       rc = rx_rtp(msg);
+       rx_rtp(msg);
+       return;

-out:
-       return rc;
+out_free:
+       msgb_free(msg);
 }

 /* Note: This function is able to handle RTP and RTCP. msgb ownership is 
transferred, so this function or its
@@ -1590,6 +1588,24 @@
        return -1;
 }

+static void rtp_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, 
const struct osmo_sockaddr *daddr)
+{
+       /* nothing; osmo_io takes care of msgb_free */
+       if (res < 0) {
+               struct mgcp_conn_rtp *conn_rtp = (struct mgcp_conn_rtp *) 
osmo_iofd_get_data(iofd);
+               int priv_nr = osmo_iofd_get_priv_nr(iofd);
+               char errbuf[129];
+               strerror_r(-res, errbuf, sizeof(errbuf));
+               LOG_CONN_RTP(conn_rtp, LOGL_ERROR, "%s sendto(%s) failed: 
%s\n", priv_nr ? "RTCP" : "RTP",
+                        osmo_sockaddr_to_str(daddr), errbuf);
+       }
+}
+
+static const struct osmo_io_ops rtp_ioops = {
+       .recvfrom_cb = rtp_recvfrom_cb,
+       .sendto_cb = rtp_sendto_cb,
+};
+
 /*! bind RTP port to osmo_fd.
  *  \param[in] source_addr source (local) address to bind on.
  *  \param[in] port to bind on.
@@ -1617,7 +1633,7 @@
 static int bind_rtp(struct mgcp_config *cfg, const char *source_addr,
                    struct mgcp_rtp_end *rtp_end, struct mgcp_endpoint *endp)
 {
-       int rc;
+       int rc, rtp_fd, rtcp_fd;

        /* NOTE: The port that is used for RTCP is the RTP port incremented by 
one
         * (e.g. RTP-Port = 16000 ==> RTCP-Port = 16001) */
@@ -1629,7 +1645,7 @@
                         source_addr, rtp_end->local_port);
                goto cleanup0;
        }
-       rtp_end->rtp.fd = rc;
+       rtp_fd = rc;

        rc = mgcp_create_bind(source_addr, rtp_end->local_port + 1, 
cfg->endp_dscp, cfg->endp_priority);
        if (rc < 0) {
@@ -1638,16 +1654,16 @@
                         source_addr, rtp_end->local_port + 1);
                goto cleanup1;
        }
-       rtp_end->rtcp.fd = rc;
+       rtcp_fd = rc;

-       if (osmo_fd_register(&rtp_end->rtp) != 0) {
+       if (osmo_iofd_register(rtp_end->rtp, rtp_fd) < 0) {
                LOGPENDP(endp, DRTP, LOGL_ERROR,
                         "failed to register RTP port %d\n",
                         rtp_end->local_port);
                goto cleanup2;
        }

-       if (osmo_fd_register(&rtp_end->rtcp) != 0) {
+       if (osmo_iofd_register(rtp_end->rtcp, rtcp_fd) != 0) {
                LOGPENDP(endp, DRTP, LOGL_ERROR,
                         "failed to register RTCP port %d\n",
                         rtp_end->local_port + 1);
@@ -1657,13 +1673,11 @@
        return 0;

 cleanup3:
-       osmo_fd_unregister(&rtp_end->rtp);
+       osmo_iofd_unregister(rtp_end->rtp);
 cleanup2:
-       close(rtp_end->rtcp.fd);
-       rtp_end->rtcp.fd = -1;
+       close(rtcp_fd);
 cleanup1:
-       close(rtp_end->rtp.fd);
-       rtp_end->rtp.fd = -1;
+       close(rtp_fd);
 cleanup0:
        return -1;
 }
@@ -1682,7 +1696,8 @@
        snprintf(name, sizeof(name), "%s-%s", conn->conn->name, conn->conn->id);
        end = &conn->end;

-       if (end->rtp.fd != -1 || end->rtcp.fd != -1) {
+       if ((end->rtp && osmo_iofd_get_fd(end->rtp) != -1) ||
+           (end->rtcp && osmo_iofd_get_fd(end->rtcp) != -1)) {
                LOGPENDP(endp, DRTP, LOGL_ERROR, "%u was already bound on 
conn:%s\n",
                         rtp_port, mgcp_conn_dump(conn->conn));

@@ -1695,8 +1710,18 @@
        }

        end->local_port = rtp_port;
-       osmo_fd_setup(&end->rtp, -1, OSMO_FD_READ, rtp_data_net, conn, 0);
-       osmo_fd_setup(&end->rtcp, -1, OSMO_FD_READ, rtp_data_net, conn, 0);
+       end->rtp = osmo_iofd_setup(conn->conn, -1, name, 
OSMO_IO_FD_MODE_RECVFROM_SENDTO, &rtp_ioops, conn);
+       if (!end->rtp)
+               return -EIO;
+       osmo_iofd_set_alloc_info(end->rtp, RTP_BUF_SIZE, 0);
+       end->rtcp = osmo_iofd_setup(conn->conn, -1, name, 
OSMO_IO_FD_MODE_RECVFROM_SENDTO, &rtp_ioops, conn);
+       if (!end->rtcp) {
+               osmo_iofd_free(end->rtp);
+               end->rtp = NULL;
+               return -EIO;
+       }
+       osmo_iofd_set_alloc_info(end->rtcp, RTP_BUF_SIZE, 0);
+       osmo_iofd_set_priv_nr(end->rtcp, 1); /* we use priv_nr as identifier 
for RTCP */

        return bind_rtp(endp->trunk->cfg, conn->end.local_addr, end, endp);
 }
@@ -1705,15 +1730,13 @@
  *  \param[in] end RTP end */
 void mgcp_free_rtp_port(struct mgcp_rtp_end *end)
 {
-       if (end->rtp.fd != -1) {
-               osmo_fd_unregister(&end->rtp);
-               close(end->rtp.fd);
-               end->rtp.fd = -1;
+       if (end->rtp) {
+               osmo_iofd_free(end->rtp);
+               end->rtp = NULL;
        }

-       if (end->rtcp.fd != -1) {
-               osmo_fd_unregister(&end->rtcp);
-               close(end->rtcp.fd);
-               end->rtcp.fd = -1;
+       if (end->rtcp) {
+               osmo_iofd_free(end->rtcp);
+               end->rtcp = NULL;
        }
 }
diff --git a/src/libosmo-mgcp/mgcp_osmux.c b/src/libosmo-mgcp/mgcp_osmux.c
index 3df9972..df91dbc 100644
--- a/src/libosmo-mgcp/mgcp_osmux.c
+++ b/src/libosmo-mgcp/mgcp_osmux.c
@@ -1,6 +1,7 @@
 /*
  * (C) 2012-2013 by Pablo Neira Ayuso <[email protected]>
  * (C) 2012-2013 by On Waves ehf <http://www.on-waves.com>
+ * (C) 2013-2024 by sysmocom - s.f.m.c. GmbH
  * All rights not specifically granted under this license are reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
@@ -13,9 +14,11 @@
 #include <string.h> /* for memcpy */
 #include <stdlib.h> /* for abs */
 #include <inttypes.h> /* for PRIu64 */
+#include <unistd.h> /* for PRIu64 */
 #include <netinet/in.h>
 #include <osmocom/core/msgb.h>
 #include <osmocom/core/socket.h>
+#include <osmocom/core/osmo_io.h>
 #include <osmocom/core/talloc.h>

 #include <osmocom/netif/osmux.h>
@@ -30,8 +33,8 @@
 #include <osmocom/mgcp/mgcp_endp.h>
 #include <osmocom/mgcp/mgcp_trunk.h>
 
-static struct osmo_fd osmux_fd_v4;
-static struct osmo_fd osmux_fd_v6;
+static struct osmo_io_fd *osmux_fd_v4;
+static struct osmo_io_fd *osmux_fd_v6;

 static LLIST_HEAD(osmux_handle_list);

@@ -76,34 +79,31 @@
 static void osmux_deliver_cb(struct msgb *batch_msg, void *data)
 {
        struct osmux_handle *handle = data;
-       socklen_t dest_len;
-       int rc, fd;
-       struct mgcp_trunk *trunk = (struct mgcp_trunk *)osmux_fd_v4.data;
+       int rc;
+       struct osmo_io_fd *iofd;
+       struct mgcp_trunk *trunk = (struct mgcp_trunk *) 
osmo_iofd_get_data(osmux_fd_v4);
        struct rate_ctr_group *all_osmux_stats = 
trunk->ratectr.all_osmux_conn_stats;

        switch (handle->rem_addr.u.sa.sa_family) {
        case AF_INET6:
-               dest_len = sizeof(handle->rem_addr.u.sin6);
-               fd = osmux_fd_v6.fd;
+               iofd = osmux_fd_v6;
                break;
        case AF_INET:
        default:
-               dest_len = sizeof(handle->rem_addr.u.sin);
-               fd = osmux_fd_v4.fd;
+               iofd = osmux_fd_v4;
                break;
        }
-       rc = sendto(fd, batch_msg->data, batch_msg->len, 0,
-                   (struct sockaddr *)&handle->rem_addr.u.sa, dest_len);
+       rc = osmo_iofd_sendto_msgb(iofd, batch_msg, 0, &handle->rem_addr);
        if (rc < 0) {
                char errbuf[129];
-               strerror_r(errno, errbuf, sizeof(errbuf));
+               strerror_r(-rc, errbuf, sizeof(errbuf));
                LOGP(DOSMUX, LOGL_NOTICE, "osmux sendto(%s) failed: %s\n",
                         osmo_sockaddr_to_str(&handle->rem_addr), errbuf);
                rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, 
OSMUX_DROPPED_PACKETS_CTR));
+               msgb_free(batch_msg);
        } else {
                rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, 
OSMUX_PACKETS_TX_CTR));
        }
-       msgb_free(batch_msg);
 }

 /* Lookup existing OSMUX handle for specified destination address. */
@@ -325,28 +325,6 @@
        /* dispatch_rtp_cb() has taken ownership of the msgb */
 }

-static struct msgb *osmux_recv(struct osmo_fd *ofd, struct osmo_sockaddr *addr)
-{
-       struct msgb *msg;
-       socklen_t slen = sizeof(addr->u.sas);
-       int ret;
-
-       msg = msgb_alloc(4096, "OSMUX");
-       if (!msg) {
-               LOGP(DOSMUX, LOGL_ERROR, "cannot allocate message\n");
-               return NULL;
-       }
-       ret = recvfrom(ofd->fd, msg->data, msg->data_len, 0, &addr->u.sa, 
&slen);
-       if (ret <= 0) {
-               msgb_free(msg);
-               LOGP(DOSMUX, LOGL_ERROR, "cannot receive message\n");
-               return NULL;
-       }
-       msgb_put(msg, ret);
-
-       return msg;
-}
-
 /* To be called every time some AMR data is received on a connection
  * returns: 0 if conn can process data, negative if an error ocurred and data 
should not be further processed */
 static int conn_osmux_event_data_received(struct mgcp_conn_rtp *conn, const 
struct osmo_sockaddr *rem_addr)
@@ -442,22 +420,16 @@
 }

 #define osmux_chunk_length(msg, rem) ((rem) - (msg)->len)
-static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what)
+static void osmux_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb 
*msg, const struct osmo_sockaddr *rem_addr)
 {
-       struct msgb *msg;
        struct osmux_hdr *osmuxh;
-       struct osmo_sockaddr rem_addr;
-       uint32_t rem;
-       struct mgcp_trunk *trunk = ofd->data;
+       struct mgcp_trunk *trunk = osmo_iofd_get_data(iofd);
        struct rate_ctr_group *all_rtp_stats = 
trunk->ratectr.all_osmux_conn_stats;
+       uint32_t rem;
        char addr_str[64];

-       msg = osmux_recv(ofd, &rem_addr);
-       if (!msg)
-               return -1;
-
        rate_ctr_inc(rate_ctr_group_get_ctr(all_rtp_stats, 
OSMUX_PACKETS_RX_CTR));
-       osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), &rem_addr);
+       osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), rem_addr);

        if (trunk->cfg->osmux.usage == OSMUX_USAGE_OFF) {
                LOGP(DOSMUX, LOGL_ERROR,
@@ -467,14 +439,16 @@
        }

        /* Catch legacy dummy message and process them separately: */
-       if (msg->len == 2 && msg->data[0] == MGCP_DUMMY_LOAD)
-               return osmux_handle_legacy_dummy(trunk, &rem_addr, msg);
+       if (msg->len == 2 && msg->data[0] == MGCP_DUMMY_LOAD) {
+               osmux_handle_legacy_dummy(trunk, rem_addr, msg);
+               return;
+       }

        rem = msg->len;
        while((osmuxh = osmux_xfrm_output_pull(msg)) != NULL) {
                struct mgcp_conn_rtp *conn_src;
                conn_src = osmux_conn_lookup(trunk, osmuxh->circuit_id,
-                                            &rem_addr);
+                                            rem_addr);
                if (!conn_src) {
                        LOGP(DOSMUX, LOGL_DEBUG,
                             "Cannot find a src conn for %s CID=%d\n",
@@ -482,7 +456,7 @@
                        goto next;
                }

-               if (conn_osmux_event_data_received(conn_src, &rem_addr) < 0)
+               if (conn_osmux_event_data_received(conn_src, rem_addr) < 0)
                        goto next;

                mgcp_conn_watchdog_kick(conn_src->conn);
@@ -496,19 +470,38 @@
        }
 out:
        msgb_free(msg);
-       return 0;
 }

+static void osmux_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb 
*msg, const struct osmo_sockaddr *rem_addr)
+{
+       /* nothing; osmo_io takes care of msgb_free */
+       if (res < 0) {
+               struct mgcp_trunk *trunk = (struct mgcp_trunk *) 
osmo_iofd_get_data(iofd);
+               struct rate_ctr_group *all_osmux_stats = 
trunk->ratectr.all_osmux_conn_stats;
+               char errbuf[129];
+               strerror_r(-res, errbuf, sizeof(errbuf));
+               LOGP(DOSMUX, LOGL_NOTICE, "osmux sendto(%s) failed: %s\n", 
osmo_sockaddr_to_str(rem_addr), errbuf);
+               rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, 
OSMUX_DROPPED_PACKETS_CTR));
+       }
+}
+
+static const struct osmo_io_ops osmux_ioops = {
+       .recvfrom_cb = osmux_recvfrom_cb,
+       .sendto_cb = osmux_sendto_cb,
+};
+
 int osmux_init(struct mgcp_trunk *trunk)
 {
-       int ret;
+       int ret, fd;
        struct mgcp_config *cfg = trunk->cfg;

        /* So far we only support running on one trunk: */
        OSMO_ASSERT(trunk == mgcp_trunk_by_num(cfg, MGCP_TRUNK_VIRTUAL, 
MGCP_VIRT_TRUNK_ID));

-       osmo_fd_setup(&osmux_fd_v4, -1, OSMO_FD_READ, osmux_read_fd_cb, trunk, 
0);
-       osmo_fd_setup(&osmux_fd_v6, -1, OSMO_FD_READ, osmux_read_fd_cb, trunk, 
0);
+       osmux_fd_v4 = osmo_iofd_setup(trunk, -1, "osmux_fd_v4", 
OSMO_IO_FD_MODE_RECVFROM_SENDTO, &osmux_ioops, trunk);
+       if (!osmux_fd_v4)
+               goto out;
+       osmo_iofd_set_alloc_info(osmux_fd_v4, 4096, 0);

        if (cfg->osmux.local_addr_v4) {
                ret = mgcp_create_bind(cfg->osmux.local_addr_v4, 
cfg->osmux.local_port,
@@ -516,40 +509,55 @@
                if (ret < 0) {
                        LOGP(DOSMUX, LOGL_ERROR, "Cannot bind OSMUX IPv4 socket 
to %s:%u\n",
                             cfg->osmux.local_addr_v4, cfg->osmux.local_port);
-                       return ret;
+                       goto out_free_v4;
                }
-               osmux_fd_v4.fd = ret;
+               fd = ret;

-               ret = osmo_fd_register(&osmux_fd_v4);
+               ret = osmo_iofd_register(osmux_fd_v4, fd);
                if (ret < 0) {
-                       LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv4 
socket %s\n",
-                            osmo_sock_get_name2(osmux_fd_v4.fd));
-                       return ret;
+                       LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv4 
socket %s\n", osmo_sock_get_name2(fd));
+                       close(fd);
+                       goto out_free_v4;
                }
-               LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv4 socket listening on %s\n",
-                    osmo_sock_get_name2(osmux_fd_v4.fd));
+               LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv4 socket listening on %s\n", 
osmo_sock_get_name2(fd));
        }
+
+       osmux_fd_v6 = osmo_iofd_setup(trunk, -1, "osmux_fd_v6", 
OSMO_IO_FD_MODE_RECVFROM_SENDTO, &osmux_ioops, trunk);
+       if (!osmux_fd_v6)
+               goto out_free_v4;
+       osmo_iofd_set_alloc_info(osmux_fd_v6, 4096, 0);
+
        if (cfg->osmux.local_addr_v6) {
                ret = mgcp_create_bind(cfg->osmux.local_addr_v6, 
cfg->osmux.local_port,
                                        cfg->endp_dscp, cfg->endp_priority);
                if (ret < 0) {
                        LOGP(DOSMUX, LOGL_ERROR, "Cannot bind OSMUX IPv6 socket 
to [%s]:%u\n",
                             cfg->osmux.local_addr_v6, cfg->osmux.local_port);
-                       return ret;
+                       goto out_free_v6;
                }
-               osmux_fd_v6.fd = ret;
+               fd = ret;

-               ret = osmo_fd_register(&osmux_fd_v6);
+               ret = osmo_iofd_register(osmux_fd_v6, fd);
                if (ret < 0) {
-                       LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv6 
socket %s\n",
-                            osmo_sock_get_name2(osmux_fd_v6.fd));
-                       return ret;
+                       LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv6 
socket %s\n", osmo_sock_get_name2(fd));
+                       close(fd);
+                       goto out_free_v6;
                }
-               LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv6 socket listening on %s\n",
-                    osmo_sock_get_name2(osmux_fd_v6.fd));
+               LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv6 socket listening on %s\n", 
osmo_sock_get_name2(fd));
        }
        cfg->osmux.initialized = true;
        return 0;
+
+out_free_v6:
+       /* osmo_iofd_free performs unregister + close */
+       osmo_iofd_free(osmux_fd_v6);
+       osmux_fd_v6 = NULL;
+out_free_v4:
+       /* osmo_iofd_free performs unregister + close */
+       osmo_iofd_free(osmux_fd_v4);
+       osmux_fd_v4 = NULL;
+out:
+       return -1;
 }

 /*! relase OSXMUX cid, that had been allocated to this connection.
@@ -715,7 +723,7 @@
                 osmo_sockaddr_ntop(&conn->end.addr.u.sa, ipbuf),
                 osmo_sockaddr_port(&conn->end.addr.u.sa), 
conn->osmux.remote_cid);

-       return mgcp_udp_send(osmux_fd_v4.fd, &conn->end.addr, (char *)osmuxh, 
buf_len);
+       return mgcp_udp_send(osmux_fd_v4, &conn->end.addr, (char *)osmuxh, 
buf_len);
 }

 /* Keeps track of locally allocated Osmux circuit ID. +7 to round up to 8 bit 
boundary. */
diff --git a/tests/mgcp/mgcp_test.c b/tests/mgcp/mgcp_test.c
index c76bd9d..ffc8a20 100644
--- a/tests/mgcp/mgcp_test.c
+++ b/tests/mgcp/mgcp_test.c
@@ -653,12 +653,13 @@

 static int dummy_packets = 0;
 /* override and forward */
-ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
-              const struct sockaddr *dest_addr, socklen_t addrlen)
+int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int 
flags, const struct osmo_sockaddr *addr)
 {
        uint32_t dest_host =
-           htonl(((struct sockaddr_in *)dest_addr)->sin_addr.s_addr);
-       int dest_port = htons(((struct sockaddr_in *)dest_addr)->sin_port);
+           htonl(((struct sockaddr_in *)addr)->sin_addr.s_addr);
+       int dest_port = htons(((struct sockaddr_in *)addr)->sin_port);
+       const uint8_t *buf = msgb_data(msg);
+       size_t len = msgb_length(msg);

        if (len == sizeof(rtp_dummy_payload)
            && memcmp(buf, rtp_dummy_payload, sizeof(rtp_dummy_payload)) == 0) {
@@ -672,6 +673,8 @@
        OSMO_ASSERT(dest_host);
        OSMO_ASSERT(dest_port);

+       msgb_free(msg);
+
        return len;
 }


--
To view, visit https://gerrit.osmocom.org/c/osmo-mgw/+/36363?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: osmo-mgw
Gerrit-Branch: master
Gerrit-Change-Id: I8471960d5d8088a70cf105f2f40dfa5d5458169a
Gerrit-Change-Number: 36363
Gerrit-PatchSet: 6
Gerrit-Owner: laforge <[email protected]>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: dexter <[email protected]>
Gerrit-Reviewer: fixeria <[email protected]>
Gerrit-Reviewer: jolly <[email protected]>
Gerrit-Reviewer: laforge <[email protected]>
Gerrit-Reviewer: neels <[email protected]>
Gerrit-Reviewer: pespin <[email protected]>
Gerrit-MessageType: merged

Reply via email to