jolly has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/libosmocore/+/40856?usp=email )


Change subject: Put all io_uring related read and write states into sub 
structures
......................................................................

Put all io_uring related read and write states into sub structures

Related: OS#6705
Change-Id: I3cf4ab6d9aebf5532ad174c90c7b0e9633491c88
---
M src/core/osmo_io.c
M src/core/osmo_io_internal.h
M src/core/osmo_io_uring.c
3 files changed, 69 insertions(+), 62 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/libosmocore refs/changes/56/40856/1

diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index 6a5dcce..6e19231 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -893,7 +893,7 @@
        if (sqes < 1 || sqes > IOFD_MSGHDR_MAX_READ_SQES)
                return -EINVAL;

-       iofd->u.uring.num_read_sqes = sqes;
+       iofd->u.uring.read.num_sqes = sqes;
        return 0;
 }

diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index 3d92b85..b417e37 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -107,19 +107,26 @@
                        struct osmo_fd ofd;
                } poll;
                struct {
-                       bool read_enabled;
-                       bool write_enabled;
-                       /*! requested number of simultaniously submitted read 
SQEs */
-                       uint8_t num_read_sqes;
-                       /*! array of simultaneously submitted read SQEs */
-                       void *read_msghdr[IOFD_MSGHDR_MAX_READ_SQES];
-                       /*! ring the read SQEs have been submitted to */
-                       struct io_uring *read_ring;
-                       /*! current number of simultaneously submitted read 
SQEs */
-                       uint8_t reads_submitted;
-                       void *write_msghdr;
-                       /*! ring the write SQE has been submitted to */
-                       struct io_uring *write_ring;
+                       struct {
+                               /*! read is enabled, due to registration of 
callback function */
+                               bool enabled;
+                               /*! requested number of simultaniously 
submitted read SQEs */
+                               uint8_t num_sqes;
+                               /*! array of simultaneously submitted read SQEs 
*/
+                               void *msghdr[IOFD_MSGHDR_MAX_READ_SQES];
+                               /*! ring the read SQEs have been submitted to */
+                               struct io_uring *ring;
+                               /*! current number of simultaneously submitted 
read SQEs */
+                               uint8_t sqes_submitted;
+                       } read;
+                       struct {
+                               /*! write is enabled, due to pending msghdr in 
tx_queue */
+                               bool enabled;
+                               /*! submitted write SQE */
+                               void *msghdr;
+                               /*! ring the write SQE has been submitted to */
+                               struct io_uring *ring;
+                       } write;
                        /* TODO: index into array of registered fd's? */
                        /* osmo_fd for non-blocking connect handling */
                        struct osmo_fd connect_ofd;
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index a867968..b414c0d 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -232,13 +232,13 @@
        uint8_t idx;

        /* All subsequent read SQEs must be on the same ring. */
-       if (iofd->u.uring.reads_submitted > 0 && iofd->u.uring.read_ring != 
&g_ring->ring)
+       if (iofd->u.uring.read.sqes_submitted > 0 && iofd->u.uring.read.ring != 
&g_ring->ring)
                return -EINVAL;

        /* Tell iofd_uring_get_sqe() not to allocate a new ring, if we want to 
enqueue multiple read SQEs. */
-       sqe = iofd_uring_get_sqe(iofd->u.uring.reads_submitted > 0);
+       sqe = iofd_uring_get_sqe(iofd->u.uring.read.sqes_submitted > 0);
        if (!sqe) {
-               if (iofd->u.uring.reads_submitted > 0)
+               if (iofd->u.uring.read.sqes_submitted > 0)
                        return -EINVAL;
                LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
                OSMO_ASSERT(0);
@@ -293,9 +293,9 @@

        iofd_io_uring_submit();

-       iofd->u.uring.read_msghdr[iofd->u.uring.reads_submitted] = msghdr;
-       iofd->u.uring.reads_submitted++;
-       iofd->u.uring.read_ring = &g_ring->ring;
+       iofd->u.uring.read.msghdr[iofd->u.uring.read.sqes_submitted] = msghdr;
+       iofd->u.uring.read.sqes_submitted++;
+       iofd->u.uring.read.ring = &g_ring->ring;

        return 0;
 }
@@ -305,7 +305,7 @@
        int rc;

        /* Submit more read SQEs in advance, if requested. */
-       while (iofd->u.uring.reads_submitted < ((iofd->u.uring.num_read_sqes) ? 
: g_io_uring_read_sqes)) {
+       while (iofd->u.uring.read.sqes_submitted < 
((iofd->u.uring.read.num_sqes) ? : g_io_uring_read_sqes)) {
                rc = iofd_uring_submit_recv_sqe(iofd, action);
                /* Stop, if we cannot enqueue multiple read SQEs in the same 
ring. */
                if (rc < 0)
@@ -320,19 +320,19 @@
        uint8_t idx, i;

        /* Find which read_msghdr is completed and remove from list. */
-       for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) {
-               if (iofd->u.uring.read_msghdr[idx] == msghdr)
+       for (idx = 0; idx < iofd->u.uring.read.sqes_submitted; idx++) {
+               if (iofd->u.uring.read.msghdr[idx] == msghdr)
                        break;
        }
-       if (idx == iofd->u.uring.reads_submitted) {
+       if (idx == iofd->u.uring.read.sqes_submitted) {
                LOGP(DLIO, LOGL_FATAL, "Read SQE completion, but msghdr not 
found, please fix!\n");
                return;
        }
        /* Remove entry at idx. */
-       iofd->u.uring.reads_submitted--;
-       for (i = idx; i < iofd->u.uring.reads_submitted; i++)
-               iofd->u.uring.read_msghdr[i] = iofd->u.uring.read_msghdr[i + 1];
-       iofd->u.uring.read_msghdr[i] = NULL;
+       iofd->u.uring.read.sqes_submitted--;
+       for (i = idx; i < iofd->u.uring.read.sqes_submitted; i++)
+               iofd->u.uring.read.msghdr[i] = iofd->u.uring.read.msghdr[i + 1];
+       iofd->u.uring.read.msghdr[i] = NULL;

        for (idx = 0; idx < msghdr->io_len; idx++) {
                struct msgb *msg = msghdr->msg[idx];
@@ -351,7 +351,7 @@
                }

                /* Check for every iteration, because iofd might get 
unregistered/closed during receive function. */
-               if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
+               if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
                        iofd_handle_recv(iofd, msg, chunk, msghdr);
                else
                        msgb_free(msg);
@@ -364,7 +364,7 @@
                msghdr->msg[idx] = NULL;
        }

-       if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
+       if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
                iofd_uring_submit_recv(iofd, msghdr->action);

        iofd_msghdr_free(msghdr);
@@ -380,12 +380,12 @@
        /* Detach msghdr from iofd. It might get freed here or it is freed 
during iofd_handle_send_completion().
         * If there is pending data to send, iofd_uring_submit_tx() will attach 
it again.
         * iofd_handle_send_completion() will invoke a callback function to 
signal the possibility of write/send.
-        * This callback function might close iofd, leading to the potential 
freeing of iofd->u.uring.write_msghdr if
+        * This callback function might close iofd, leading to the potential 
freeing of iofd->u.uring.write.msghdr if
         * still attached. Since iofd_handle_send_completion() frees msghdr at 
the end of the function, detaching
         * msghdr here prevents a double-free bug. */
-       if (iofd->u.uring.write_msghdr == msghdr) {
-               iofd->u.uring.write_msghdr = NULL;
-               iofd->u.uring.write_ring = NULL;
+       if (iofd->u.uring.write.msghdr == msghdr) {
+               iofd->u.uring.write.msghdr = NULL;
+               iofd->u.uring.write.ring = NULL;
        }

        if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
@@ -399,7 +399,7 @@
        }

        /* submit the next to-be-transmitted message for this file descriptor */
-       if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
+       if (iofd->u.uring.write.enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
                iofd_uring_submit_tx(iofd);
 }

@@ -427,7 +427,7 @@

        IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);

-       if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && 
!iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr)
+       if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && 
!iofd->u.uring.read.sqes_submitted && !iofd->u.uring.write.msghdr)
                talloc_free(iofd);
 }

@@ -518,8 +518,8 @@

        iofd_io_uring_submit();

-       iofd->u.uring.write_msghdr = msghdr;
-       iofd->u.uring.write_ring = &g_ring->ring;
+       iofd->u.uring.write.msghdr = msghdr;
+       iofd->u.uring.write.ring = &g_ring->ring;

        return 0;
 }
@@ -558,12 +558,12 @@
        IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);

        /* If write/read notifications are pending, enable it now. */
-       if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
+       if (iofd->u.uring.write.enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
                iofd_uring_write_enable(iofd);
-       if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
+       if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, 
IOFD_FLAG_CLOSED))
                iofd_uring_read_enable(iofd);

-       if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && 
!iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr)
+       if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && 
!iofd->u.uring.read.sqes_submitted && !iofd->u.uring.write.msghdr)
                talloc_free(iofd);
        return 0;
 }
@@ -595,10 +595,10 @@
        struct iofd_msghdr *msghdr;
        uint8_t idx;

-       for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) {
-               struct osmo_io_uring *ring = 
container_of(iofd->u.uring.read_ring, struct osmo_io_uring, ring);
-               msghdr = iofd->u.uring.read_msghdr[idx];
-               iofd->u.uring.read_msghdr[idx] = NULL;
+       for (idx = 0; idx < iofd->u.uring.read.sqes_submitted; idx++) {
+               struct osmo_io_uring *ring = 
container_of(iofd->u.uring.read.ring, struct osmo_io_uring, ring);
+               msghdr = iofd->u.uring.read.msghdr[idx];
+               iofd->u.uring.read.msghdr[idx] = NULL;
                /* Submit SQEs of the current ring, if needed. */
                if (&ring->ring == &g_ring->ring)
                        osmo_io_uring_submit();
@@ -617,15 +617,15 @@
                talloc_steal(OTC_GLOBAL, msghdr);
                msghdr->iofd = NULL;
        }
-       if (iofd->u.uring.reads_submitted) {
-               iofd->u.uring.read_ring = NULL;
-               iofd->u.uring.reads_submitted = 0;
+       if (iofd->u.uring.read.sqes_submitted) {
+               iofd->u.uring.read.ring = NULL;
+               iofd->u.uring.read.sqes_submitted = 0;
        }

-       if (iofd->u.uring.write_msghdr) {
-               struct osmo_io_uring *ring = 
container_of(iofd->u.uring.write_ring, struct osmo_io_uring, ring);
-               msghdr = iofd->u.uring.write_msghdr;
-               iofd->u.uring.write_msghdr = NULL;
+       if (iofd->u.uring.write.msghdr) {
+               struct osmo_io_uring *ring = 
container_of(iofd->u.uring.write.ring, struct osmo_io_uring, ring);
+               msghdr = iofd->u.uring.write.msghdr;
+               iofd->u.uring.write.msghdr = NULL;
                for (int idx = 0; idx < msghdr->io_len; idx++) {
                        msgb_free(msghdr->msg[idx]);
                        msghdr->msg[idx] = NULL;
@@ -647,7 +647,7 @@
                }
                talloc_steal(OTC_GLOBAL, msghdr);
                msghdr->iofd = NULL;
-               iofd->u.uring.write_ring = NULL;
+               iofd->u.uring.write.ring = NULL;
        }

        if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_NOTIFY_CONNECTED)) {
@@ -660,9 +660,9 @@

 static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
 {
-       iofd->u.uring.write_enabled = true;
+       iofd->u.uring.write.enabled = true;

-       if (iofd->u.uring.write_msghdr)
+       if (iofd->u.uring.write.msghdr)
                return;

        /* This function is called again, once the socket is connected. */
@@ -699,21 +699,21 @@

                iofd_io_uring_submit();

-               iofd->u.uring.write_msghdr = msghdr;
-               iofd->u.uring.write_ring = &g_ring->ring;
+               iofd->u.uring.write.msghdr = msghdr;
+               iofd->u.uring.write.ring = &g_ring->ring;
        }
 }

 static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
 {
-       iofd->u.uring.write_enabled = false;
+       iofd->u.uring.write.enabled = false;
 }

 static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
 {
-       iofd->u.uring.read_enabled = true;
+       iofd->u.uring.read.enabled = true;

-       if (iofd->u.uring.reads_submitted)
+       if (iofd->u.uring.read.sqes_submitted)
                return;

        /* This function is called again, once the socket is connected. */
@@ -737,7 +737,7 @@

 static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
 {
-       iofd->u.uring.read_enabled = false;
+       iofd->u.uring.read.enabled = false;
 }

 static int iofd_uring_close(struct osmo_io_fd *iofd)
@@ -753,7 +753,7 @@
        }

        /* OSMO_IO_FD_MODE_RECVMSG_SENDMSG: Don't call this function after 
enabling read or write. */
-       OSMO_ASSERT(!iofd->u.uring.write_enabled && 
!iofd->u.uring.read_enabled);
+       OSMO_ASSERT(!iofd->u.uring.write.enabled && 
!iofd->u.uring.read.enabled);

        /* Set flag to enable temporary osmo_fd during register() time: */
        IOFD_FLAG_SET(iofd, IOFD_FLAG_NOTIFY_CONNECTED);

--
To view, visit https://gerrit.osmocom.org/c/libosmocore/+/40856?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: I3cf4ab6d9aebf5532ad174c90c7b0e9633491c88
Gerrit-Change-Number: 40856
Gerrit-PatchSet: 1
Gerrit-Owner: jolly <andr...@eversberg.eu>

Reply via email to