laforge has submitted this change. ( https://gerrit.osmocom.org/c/libosmocore/+/40493?usp=email )
( 10 is the latest approved patch-set. No files were changed between the latest approved patch-set and the submitted one. )Change subject: Add multiple messages buffers to io_uring write operations ...................................................................... Add multiple messages buffers to io_uring write operations Multiple message buffers can be writen by sending a single SQE when using io_uring. If there is less data written, the completely written buffers are removed and the partly written buffers are truncated. Afterwards they are re-queued for next write operation. Having more than one buffer is optional and the number can be controlled via environment variable. Related: OS#6705 Change-Id: I8c4e0a785cf66becd7fb5b2caf718c9724b56686 --- M src/core/osmo_io.c M src/core/osmo_io_uring.c 2 files changed, 103 insertions(+), 46 deletions(-) Approvals: laforge: Looks good to me, but someone else must approve pespin: Looks good to me, approved Jenkins Builder: Verified diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index 46885aa..bfc75c6 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -417,42 +417,74 @@ */ void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_msghdr *msghdr) { - struct msgb *msg = msghdr->msg[0]; + int idx, i; - /* Incomplete write */ - if (rc > 0 && rc < msgb_length(msg)) { - /* Re-enqueue remaining data */ - msgb_pull(msg, rc); - msghdr->iov[0].iov_len = msgb_length(msg); - iofd_txqueue_enqueue_front(iofd, msghdr); - return; - } - - /* Reenqueue the complete msgb */ + /* Re-enqueue the complete msgb. */ if (rc == -EAGAIN) { iofd_txqueue_enqueue_front(iofd, msghdr); return; } - /* All other failure and success cases are handled here */ - switch (msghdr->action) { - case IOFD_ACT_WRITE: - if (iofd->io_ops.write_cb) - iofd->io_ops.write_cb(iofd, rc, msg); - break; - case IOFD_ACT_SENDTO: - if (iofd->io_ops.sendto_cb) - iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa); - break; - case IOFD_ACT_SENDMSG: - if (iofd->io_ops.sendmsg_cb) - iofd->io_ops.sendmsg_cb(iofd, rc, msg); - break; - default: - OSMO_ASSERT(0); - } + for (idx = 0; idx < msghdr->io_len; idx++) { + struct msgb *msg = msghdr->msg[idx]; + int chunk; - msgb_free(msghdr->msg[0]); + /* Incomplete write */ + if (rc > 0 && rc < msgb_length(msg)) { + /* Keep msg with unsent data only. */ + msgb_pull(msg, rc); + msghdr->iov[idx].iov_len = msgb_length(msg); + /* Shift all existing buffers down. */ + if (idx) { + msghdr->io_len -= idx; + for (i = 0; i < msghdr->io_len; i++) { + msghdr->iov[i] = msghdr->iov[idx + i]; + msghdr->msg[i] = msghdr->msg[idx + i]; + } + for (i = 0; i < idx; i++) { + memset(&msghdr->iov[msghdr->io_len + i], 0, sizeof(struct iovec)); + msghdr->msg[msghdr->io_len + i] = NULL; + } + msghdr->hdr.msg_iovlen = msghdr->io_len; + } + /* Re-enqueue remaining buffers. */ + iofd_txqueue_enqueue_front(iofd, msghdr); + return; + } + + if (rc >= 0) { + chunk = msgb_length(msg); + if (rc < chunk) + chunk = rc; + } else { + chunk = rc; + } + + /* All other failure and success cases are handled here */ + switch (msghdr->action) { + case IOFD_ACT_WRITE: + if (iofd->io_ops.write_cb) + iofd->io_ops.write_cb(iofd, chunk, msg); + break; + case IOFD_ACT_SENDTO: + if (iofd->io_ops.sendto_cb) + iofd->io_ops.sendto_cb(iofd, chunk, msg, &msghdr->osa); + break; + case IOFD_ACT_SENDMSG: + if (iofd->io_ops.sendmsg_cb) + iofd->io_ops.sendmsg_cb(iofd, chunk, msg); + break; + default: + OSMO_ASSERT(0); + } + + msgb_free(msghdr->msg[idx]); + msghdr->msg[idx] = NULL; + + /* The user can unregister/close the iofd during callback above. */ + if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_FD_REGISTERED)) + break; + } iofd_msghdr_free(msghdr); } @@ -475,6 +507,8 @@ int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg) { int rc; + struct iofd_msghdr *msghdr; + int idx; if (OSMO_UNLIKELY(msgb_length(msg) == 0)) { LOGPIO(iofd, LOGL_ERROR, "Length is 0, rejecting msgb.\n"); @@ -483,21 +517,35 @@ OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE); - struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0); - if (!msghdr) - return -ENOMEM; + /* Always try to add msg to last msghdr. Only if it is completely filled, allocate a new msghdr. + * This way all the previous meghdrs in the queue are completely filled. */ + msghdr = llist_last_entry_or_null(&iofd->tx_queue.msg_queue, struct iofd_msghdr, list); + if (msghdr && msghdr->io_len < iofd->io_write_buffers) { + /* Add msg to existing msghdr. */ + msghdr->msg[msghdr->io_len++] = msg; + } else { + /* Create new msghdr with msg. */ + msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0); + if (!msghdr) + return -ENOMEM; + msghdr->hdr.msg_iov = &msghdr->iov[0]; + msghdr->flags = MSG_NOSIGNAL; + } - msghdr->flags = MSG_NOSIGNAL; - msghdr->iov[0].iov_base = msgb_data(msghdr->msg[0]); - msghdr->iov[0].iov_len = msgb_length(msghdr->msg[0]); - msghdr->hdr.msg_iov = &msghdr->iov[0]; - msghdr->hdr.msg_iovlen = 1; + /* Add set IO vector to msg. */ + idx = msghdr->io_len - 1; + msghdr->iov[idx].iov_base = msgb_data(msg); + msghdr->iov[idx].iov_len = msgb_length(msg); + msghdr->hdr.msg_iovlen = msghdr->io_len; - rc = iofd_txqueue_enqueue(iofd, msghdr); - if (rc < 0) { - iofd_msghdr_free(msghdr); - LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n", rc); - return rc; + /* Only new msghdr will be enqueued. */ + if (msghdr->io_len == 1) { + rc = iofd_txqueue_enqueue(iofd, msghdr); + if (rc < 0) { + iofd_msghdr_free(msghdr); + LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n", rc); + return rc; + } } return 0; @@ -884,7 +932,10 @@ { struct iofd_msghdr *hdr; while ((hdr = iofd_txqueue_dequeue(iofd))) { - msgb_free(hdr->msg[0]); + for (int idx = 0; idx < hdr->io_len; idx++) { + msgb_free(hdr->msg[idx]); + hdr->msg[idx] = NULL; + } iofd_msghdr_free(hdr); } } diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c index 754a39a..018bdab 100644 --- a/src/core/osmo_io_uring.c +++ b/src/core/osmo_io_uring.c @@ -262,7 +262,10 @@ iofd->u.uring.write_msghdr = NULL; if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) { - msgb_free(msghdr->msg[0]); + for (int idx = 0; idx < msghdr->io_len; idx++) { + msgb_free(msghdr->msg[idx]); + msghdr->msg[idx] = NULL; + } iofd_msghdr_free(msghdr); } else { iofd_handle_send_completion(iofd, rc, msghdr); @@ -351,7 +354,7 @@ switch (msghdr->action) { case IOFD_ACT_WRITE: - io_uring_prep_writev(sqe, msghdr->iofd->fd, msghdr->iov, 1, -1); + io_uring_prep_writev(sqe, msghdr->iofd->fd, msghdr->iov, msghdr->io_len, -1); break; case IOFD_ACT_SENDTO: case IOFD_ACT_SENDMSG: @@ -458,7 +461,10 @@ LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n"); iofd->u.uring.write_msghdr = NULL; talloc_steal(OTC_GLOBAL, msghdr); - msgb_free(msghdr->msg[0]); + for (int idx = 0; idx < msghdr->io_len; idx++) { + msgb_free(msghdr->msg[idx]); + msghdr->msg[idx] = NULL; + } msghdr->iofd = NULL; io_uring_prep_cancel(sqe, msghdr, 0); } -- To view, visit https://gerrit.osmocom.org/c/libosmocore/+/40493?usp=email To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: libosmocore Gerrit-Branch: master Gerrit-Change-Id: I8c4e0a785cf66becd7fb5b2caf718c9724b56686 Gerrit-Change-Number: 40493 Gerrit-PatchSet: 14 Gerrit-Owner: jolly <andr...@eversberg.eu> Gerrit-Reviewer: Jenkins Builder Gerrit-Reviewer: laforge <lafo...@osmocom.org> Gerrit-Reviewer: pespin <pes...@sysmocom.de>