laforge has submitted this change. ( 
https://gerrit.osmocom.org/c/libosmocore/+/40493?usp=email )

 (

10 is the latest approved patch-set.
No files were changed between the latest approved patch-set and the submitted 
one.
 )Change subject: Add multiple messages buffers to io_uring write operations
......................................................................

Add multiple messages buffers to io_uring write operations

Multiple message buffers can be writen by sending a single SQE when
using io_uring. If there is less data written, the completely written
buffers are removed and the partly written buffers are truncated.
Afterwards they are re-queued for next write operation.

Having more than one buffer is optional and the number can be controlled
via environment variable.

Related: OS#6705
Change-Id: I8c4e0a785cf66becd7fb5b2caf718c9724b56686
---
M src/core/osmo_io.c
M src/core/osmo_io_uring.c
2 files changed, 103 insertions(+), 46 deletions(-)

Approvals:
  laforge: Looks good to me, but someone else must approve
  pespin: Looks good to me, approved
  Jenkins Builder: Verified




diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index 46885aa..bfc75c6 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -417,42 +417,74 @@
  */
 void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct 
iofd_msghdr *msghdr)
 {
-       struct msgb *msg = msghdr->msg[0];
+       int idx, i;

-       /* Incomplete write */
-       if (rc > 0 && rc < msgb_length(msg)) {
-               /* Re-enqueue remaining data */
-               msgb_pull(msg, rc);
-               msghdr->iov[0].iov_len = msgb_length(msg);
-               iofd_txqueue_enqueue_front(iofd, msghdr);
-               return;
-       }
-
-       /* Reenqueue the complete msgb */
+       /* Re-enqueue the complete msgb. */
        if (rc == -EAGAIN) {
                iofd_txqueue_enqueue_front(iofd, msghdr);
                return;
        }

-       /* All other failure and success cases are handled here */
-       switch (msghdr->action) {
-       case IOFD_ACT_WRITE:
-               if (iofd->io_ops.write_cb)
-                       iofd->io_ops.write_cb(iofd, rc, msg);
-               break;
-       case IOFD_ACT_SENDTO:
-               if (iofd->io_ops.sendto_cb)
-                       iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
-               break;
-       case IOFD_ACT_SENDMSG:
-               if (iofd->io_ops.sendmsg_cb)
-                       iofd->io_ops.sendmsg_cb(iofd, rc, msg);
-               break;
-       default:
-               OSMO_ASSERT(0);
-       }
+       for (idx = 0; idx < msghdr->io_len; idx++) {
+               struct msgb *msg = msghdr->msg[idx];
+               int chunk;

-       msgb_free(msghdr->msg[0]);
+               /* Incomplete write */
+               if (rc > 0 && rc < msgb_length(msg)) {
+                       /* Keep msg with unsent data only. */
+                       msgb_pull(msg, rc);
+                       msghdr->iov[idx].iov_len = msgb_length(msg);
+                       /* Shift all existing buffers down. */
+                       if (idx) {
+                               msghdr->io_len -= idx;
+                               for (i = 0; i < msghdr->io_len; i++) {
+                                       msghdr->iov[i] = msghdr->iov[idx + i];
+                                       msghdr->msg[i] = msghdr->msg[idx + i];
+                               }
+                               for (i = 0; i < idx; i++) {
+                                       memset(&msghdr->iov[msghdr->io_len + 
i], 0, sizeof(struct iovec));
+                                       msghdr->msg[msghdr->io_len + i] = NULL;
+                               }
+                               msghdr->hdr.msg_iovlen = msghdr->io_len;
+                       }
+                       /* Re-enqueue remaining buffers. */
+                       iofd_txqueue_enqueue_front(iofd, msghdr);
+                       return;
+               }
+
+               if (rc >= 0) {
+                       chunk = msgb_length(msg);
+                       if (rc < chunk)
+                               chunk = rc;
+               } else {
+                       chunk = rc;
+               }
+
+               /* All other failure and success cases are handled here */
+               switch (msghdr->action) {
+               case IOFD_ACT_WRITE:
+                       if (iofd->io_ops.write_cb)
+                               iofd->io_ops.write_cb(iofd, chunk, msg);
+                       break;
+               case IOFD_ACT_SENDTO:
+                       if (iofd->io_ops.sendto_cb)
+                               iofd->io_ops.sendto_cb(iofd, chunk, msg, 
&msghdr->osa);
+                       break;
+               case IOFD_ACT_SENDMSG:
+                       if (iofd->io_ops.sendmsg_cb)
+                               iofd->io_ops.sendmsg_cb(iofd, chunk, msg);
+                       break;
+               default:
+                       OSMO_ASSERT(0);
+               }
+
+               msgb_free(msghdr->msg[idx]);
+               msghdr->msg[idx] = NULL;
+
+               /* The user can unregister/close the iofd during callback 
above. */
+               if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_FD_REGISTERED))
+                       break;
+       }
        iofd_msghdr_free(msghdr);
 }

@@ -475,6 +507,8 @@
 int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg)
 {
        int rc;
+       struct iofd_msghdr *msghdr;
+       int idx;

        if (OSMO_UNLIKELY(msgb_length(msg) == 0)) {
                LOGPIO(iofd, LOGL_ERROR, "Length is 0, rejecting msgb.\n");
@@ -483,21 +517,35 @@

        OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE);

-       struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, 
msg, 0);
-       if (!msghdr)
-               return -ENOMEM;
+       /* Always try to add msg to last msghdr. Only if it is completely 
filled, allocate a new msghdr.
+        * This way all the previous meghdrs in the queue are completely 
filled. */
+       msghdr = llist_last_entry_or_null(&iofd->tx_queue.msg_queue, struct 
iofd_msghdr, list);
+       if (msghdr && msghdr->io_len < iofd->io_write_buffers) {
+               /* Add msg to existing msghdr. */
+               msghdr->msg[msghdr->io_len++] = msg;
+       } else {
+               /* Create new msghdr with msg. */
+               msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
+               if (!msghdr)
+                       return -ENOMEM;
+               msghdr->hdr.msg_iov = &msghdr->iov[0];
+               msghdr->flags = MSG_NOSIGNAL;
+       }

-       msghdr->flags = MSG_NOSIGNAL;
-       msghdr->iov[0].iov_base = msgb_data(msghdr->msg[0]);
-       msghdr->iov[0].iov_len = msgb_length(msghdr->msg[0]);
-       msghdr->hdr.msg_iov = &msghdr->iov[0];
-       msghdr->hdr.msg_iovlen = 1;
+       /* Add set IO vector to msg. */
+       idx = msghdr->io_len - 1;
+       msghdr->iov[idx].iov_base = msgb_data(msg);
+       msghdr->iov[idx].iov_len = msgb_length(msg);
+       msghdr->hdr.msg_iovlen = msghdr->io_len;

-       rc = iofd_txqueue_enqueue(iofd, msghdr);
-       if (rc < 0) {
-               iofd_msghdr_free(msghdr);
-               LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). 
Rejecting msgb\n", rc);
-               return rc;
+       /* Only new msghdr will be enqueued. */
+       if (msghdr->io_len == 1) {
+               rc = iofd_txqueue_enqueue(iofd, msghdr);
+               if (rc < 0) {
+                       iofd_msghdr_free(msghdr);
+                       LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed 
(%d). Rejecting msgb\n", rc);
+                       return rc;
+               }
        }

        return 0;
@@ -884,7 +932,10 @@
 {
        struct iofd_msghdr *hdr;
        while ((hdr = iofd_txqueue_dequeue(iofd))) {
-               msgb_free(hdr->msg[0]);
+               for (int idx = 0; idx < hdr->io_len; idx++) {
+                       msgb_free(hdr->msg[idx]);
+                       hdr->msg[idx] = NULL;
+               }
                iofd_msghdr_free(hdr);
        }
 }
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index 754a39a..018bdab 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -262,7 +262,10 @@
                iofd->u.uring.write_msghdr = NULL;

        if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
-               msgb_free(msghdr->msg[0]);
+               for (int idx = 0; idx < msghdr->io_len; idx++) {
+                       msgb_free(msghdr->msg[idx]);
+                       msghdr->msg[idx] = NULL;
+               }
                iofd_msghdr_free(msghdr);
        } else {
                iofd_handle_send_completion(iofd, rc, msghdr);
@@ -351,7 +354,7 @@

        switch (msghdr->action) {
        case IOFD_ACT_WRITE:
-               io_uring_prep_writev(sqe, msghdr->iofd->fd, msghdr->iov, 1, -1);
+               io_uring_prep_writev(sqe, msghdr->iofd->fd, msghdr->iov, 
msghdr->io_len, -1);
                break;
        case IOFD_ACT_SENDTO:
        case IOFD_ACT_SENDMSG:
@@ -458,7 +461,10 @@
                LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
                iofd->u.uring.write_msghdr = NULL;
                talloc_steal(OTC_GLOBAL, msghdr);
-               msgb_free(msghdr->msg[0]);
+               for (int idx = 0; idx < msghdr->io_len; idx++) {
+                       msgb_free(msghdr->msg[idx]);
+                       msghdr->msg[idx] = NULL;
+               }
                msghdr->iofd = NULL;
                io_uring_prep_cancel(sqe, msghdr, 0);
        }

--
To view, visit https://gerrit.osmocom.org/c/libosmocore/+/40493?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: I8c4e0a785cf66becd7fb5b2caf718c9724b56686
Gerrit-Change-Number: 40493
Gerrit-PatchSet: 14
Gerrit-Owner: jolly <andr...@eversberg.eu>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <lafo...@osmocom.org>
Gerrit-Reviewer: pespin <pes...@sysmocom.de>

Reply via email to