jolly has uploaded this change for review. ( https://gerrit.osmocom.org/c/libosmocore/+/40856?usp=email )
Change subject: Put all io_uring related read and write states into sub structures ...................................................................... Put all io_uring related read and write states into sub structures Related: OS#6705 Change-Id: I3cf4ab6d9aebf5532ad174c90c7b0e9633491c88 --- M src/core/osmo_io.c M src/core/osmo_io_internal.h M src/core/osmo_io_uring.c 3 files changed, 69 insertions(+), 62 deletions(-) git pull ssh://gerrit.osmocom.org:29418/libosmocore refs/changes/56/40856/1 diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index 6a5dcce..6e19231 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -893,7 +893,7 @@ if (sqes < 1 || sqes > IOFD_MSGHDR_MAX_READ_SQES) return -EINVAL; - iofd->u.uring.num_read_sqes = sqes; + iofd->u.uring.read.num_sqes = sqes; return 0; } diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h index 3d92b85..b417e37 100644 --- a/src/core/osmo_io_internal.h +++ b/src/core/osmo_io_internal.h @@ -107,19 +107,26 @@ struct osmo_fd ofd; } poll; struct { - bool read_enabled; - bool write_enabled; - /*! requested number of simultaniously submitted read SQEs */ - uint8_t num_read_sqes; - /*! array of simultaneously submitted read SQEs */ - void *read_msghdr[IOFD_MSGHDR_MAX_READ_SQES]; - /*! ring the read SQEs have been submitted to */ - struct io_uring *read_ring; - /*! current number of simultaneously submitted read SQEs */ - uint8_t reads_submitted; - void *write_msghdr; - /*! ring the write SQE has been submitted to */ - struct io_uring *write_ring; + struct { + /*! read is enabled, due to registration of callback function */ + bool enabled; + /*! requested number of simultaniously submitted read SQEs */ + uint8_t num_sqes; + /*! array of simultaneously submitted read SQEs */ + void *msghdr[IOFD_MSGHDR_MAX_READ_SQES]; + /*! ring the read SQEs have been submitted to */ + struct io_uring *ring; + /*! current number of simultaneously submitted read SQEs */ + uint8_t sqes_submitted; + } read; + struct { + /*! write is enabled, due to pending msghdr in tx_queue */ + bool enabled; + /*! submitted write SQE */ + void *msghdr; + /*! ring the write SQE has been submitted to */ + struct io_uring *ring; + } write; /* TODO: index into array of registered fd's? */ /* osmo_fd for non-blocking connect handling */ struct osmo_fd connect_ofd; diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c index a867968..b414c0d 100644 --- a/src/core/osmo_io_uring.c +++ b/src/core/osmo_io_uring.c @@ -232,13 +232,13 @@ uint8_t idx; /* All subsequent read SQEs must be on the same ring. */ - if (iofd->u.uring.reads_submitted > 0 && iofd->u.uring.read_ring != &g_ring->ring) + if (iofd->u.uring.read.sqes_submitted > 0 && iofd->u.uring.read.ring != &g_ring->ring) return -EINVAL; /* Tell iofd_uring_get_sqe() not to allocate a new ring, if we want to enqueue multiple read SQEs. */ - sqe = iofd_uring_get_sqe(iofd->u.uring.reads_submitted > 0); + sqe = iofd_uring_get_sqe(iofd->u.uring.read.sqes_submitted > 0); if (!sqe) { - if (iofd->u.uring.reads_submitted > 0) + if (iofd->u.uring.read.sqes_submitted > 0) return -EINVAL; LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n"); OSMO_ASSERT(0); @@ -293,9 +293,9 @@ iofd_io_uring_submit(); - iofd->u.uring.read_msghdr[iofd->u.uring.reads_submitted] = msghdr; - iofd->u.uring.reads_submitted++; - iofd->u.uring.read_ring = &g_ring->ring; + iofd->u.uring.read.msghdr[iofd->u.uring.read.sqes_submitted] = msghdr; + iofd->u.uring.read.sqes_submitted++; + iofd->u.uring.read.ring = &g_ring->ring; return 0; } @@ -305,7 +305,7 @@ int rc; /* Submit more read SQEs in advance, if requested. */ - while (iofd->u.uring.reads_submitted < ((iofd->u.uring.num_read_sqes) ? : g_io_uring_read_sqes)) { + while (iofd->u.uring.read.sqes_submitted < ((iofd->u.uring.read.num_sqes) ? : g_io_uring_read_sqes)) { rc = iofd_uring_submit_recv_sqe(iofd, action); /* Stop, if we cannot enqueue multiple read SQEs in the same ring. */ if (rc < 0) @@ -320,19 +320,19 @@ uint8_t idx, i; /* Find which read_msghdr is completed and remove from list. */ - for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) { - if (iofd->u.uring.read_msghdr[idx] == msghdr) + for (idx = 0; idx < iofd->u.uring.read.sqes_submitted; idx++) { + if (iofd->u.uring.read.msghdr[idx] == msghdr) break; } - if (idx == iofd->u.uring.reads_submitted) { + if (idx == iofd->u.uring.read.sqes_submitted) { LOGP(DLIO, LOGL_FATAL, "Read SQE completion, but msghdr not found, please fix!\n"); return; } /* Remove entry at idx. */ - iofd->u.uring.reads_submitted--; - for (i = idx; i < iofd->u.uring.reads_submitted; i++) - iofd->u.uring.read_msghdr[i] = iofd->u.uring.read_msghdr[i + 1]; - iofd->u.uring.read_msghdr[i] = NULL; + iofd->u.uring.read.sqes_submitted--; + for (i = idx; i < iofd->u.uring.read.sqes_submitted; i++) + iofd->u.uring.read.msghdr[i] = iofd->u.uring.read.msghdr[i + 1]; + iofd->u.uring.read.msghdr[i] = NULL; for (idx = 0; idx < msghdr->io_len; idx++) { struct msgb *msg = msghdr->msg[idx]; @@ -351,7 +351,7 @@ } /* Check for every iteration, because iofd might get unregistered/closed during receive function. */ - if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_handle_recv(iofd, msg, chunk, msghdr); else msgb_free(msg); @@ -364,7 +364,7 @@ msghdr->msg[idx] = NULL; } - if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_submit_recv(iofd, msghdr->action); iofd_msghdr_free(msghdr); @@ -380,12 +380,12 @@ /* Detach msghdr from iofd. It might get freed here or it is freed during iofd_handle_send_completion(). * If there is pending data to send, iofd_uring_submit_tx() will attach it again. * iofd_handle_send_completion() will invoke a callback function to signal the possibility of write/send. - * This callback function might close iofd, leading to the potential freeing of iofd->u.uring.write_msghdr if + * This callback function might close iofd, leading to the potential freeing of iofd->u.uring.write.msghdr if * still attached. Since iofd_handle_send_completion() frees msghdr at the end of the function, detaching * msghdr here prevents a double-free bug. */ - if (iofd->u.uring.write_msghdr == msghdr) { - iofd->u.uring.write_msghdr = NULL; - iofd->u.uring.write_ring = NULL; + if (iofd->u.uring.write.msghdr == msghdr) { + iofd->u.uring.write.msghdr = NULL; + iofd->u.uring.write.ring = NULL; } if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) { @@ -399,7 +399,7 @@ } /* submit the next to-be-transmitted message for this file descriptor */ - if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.write.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_submit_tx(iofd); } @@ -427,7 +427,7 @@ IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK); - if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr) + if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read.sqes_submitted && !iofd->u.uring.write.msghdr) talloc_free(iofd); } @@ -518,8 +518,8 @@ iofd_io_uring_submit(); - iofd->u.uring.write_msghdr = msghdr; - iofd->u.uring.write_ring = &g_ring->ring; + iofd->u.uring.write.msghdr = msghdr; + iofd->u.uring.write.ring = &g_ring->ring; return 0; } @@ -558,12 +558,12 @@ IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK); /* If write/read notifications are pending, enable it now. */ - if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.write.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_write_enable(iofd); - if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) + if (iofd->u.uring.read.enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED)) iofd_uring_read_enable(iofd); - if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.reads_submitted && !iofd->u.uring.write_msghdr) + if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read.sqes_submitted && !iofd->u.uring.write.msghdr) talloc_free(iofd); return 0; } @@ -595,10 +595,10 @@ struct iofd_msghdr *msghdr; uint8_t idx; - for (idx = 0; idx < iofd->u.uring.reads_submitted; idx++) { - struct osmo_io_uring *ring = container_of(iofd->u.uring.read_ring, struct osmo_io_uring, ring); - msghdr = iofd->u.uring.read_msghdr[idx]; - iofd->u.uring.read_msghdr[idx] = NULL; + for (idx = 0; idx < iofd->u.uring.read.sqes_submitted; idx++) { + struct osmo_io_uring *ring = container_of(iofd->u.uring.read.ring, struct osmo_io_uring, ring); + msghdr = iofd->u.uring.read.msghdr[idx]; + iofd->u.uring.read.msghdr[idx] = NULL; /* Submit SQEs of the current ring, if needed. */ if (&ring->ring == &g_ring->ring) osmo_io_uring_submit(); @@ -617,15 +617,15 @@ talloc_steal(OTC_GLOBAL, msghdr); msghdr->iofd = NULL; } - if (iofd->u.uring.reads_submitted) { - iofd->u.uring.read_ring = NULL; - iofd->u.uring.reads_submitted = 0; + if (iofd->u.uring.read.sqes_submitted) { + iofd->u.uring.read.ring = NULL; + iofd->u.uring.read.sqes_submitted = 0; } - if (iofd->u.uring.write_msghdr) { - struct osmo_io_uring *ring = container_of(iofd->u.uring.write_ring, struct osmo_io_uring, ring); - msghdr = iofd->u.uring.write_msghdr; - iofd->u.uring.write_msghdr = NULL; + if (iofd->u.uring.write.msghdr) { + struct osmo_io_uring *ring = container_of(iofd->u.uring.write.ring, struct osmo_io_uring, ring); + msghdr = iofd->u.uring.write.msghdr; + iofd->u.uring.write.msghdr = NULL; for (int idx = 0; idx < msghdr->io_len; idx++) { msgb_free(msghdr->msg[idx]); msghdr->msg[idx] = NULL; @@ -647,7 +647,7 @@ } talloc_steal(OTC_GLOBAL, msghdr); msghdr->iofd = NULL; - iofd->u.uring.write_ring = NULL; + iofd->u.uring.write.ring = NULL; } if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_NOTIFY_CONNECTED)) { @@ -660,9 +660,9 @@ static void iofd_uring_write_enable(struct osmo_io_fd *iofd) { - iofd->u.uring.write_enabled = true; + iofd->u.uring.write.enabled = true; - if (iofd->u.uring.write_msghdr) + if (iofd->u.uring.write.msghdr) return; /* This function is called again, once the socket is connected. */ @@ -699,21 +699,21 @@ iofd_io_uring_submit(); - iofd->u.uring.write_msghdr = msghdr; - iofd->u.uring.write_ring = &g_ring->ring; + iofd->u.uring.write.msghdr = msghdr; + iofd->u.uring.write.ring = &g_ring->ring; } } static void iofd_uring_write_disable(struct osmo_io_fd *iofd) { - iofd->u.uring.write_enabled = false; + iofd->u.uring.write.enabled = false; } static void iofd_uring_read_enable(struct osmo_io_fd *iofd) { - iofd->u.uring.read_enabled = true; + iofd->u.uring.read.enabled = true; - if (iofd->u.uring.reads_submitted) + if (iofd->u.uring.read.sqes_submitted) return; /* This function is called again, once the socket is connected. */ @@ -737,7 +737,7 @@ static void iofd_uring_read_disable(struct osmo_io_fd *iofd) { - iofd->u.uring.read_enabled = false; + iofd->u.uring.read.enabled = false; } static int iofd_uring_close(struct osmo_io_fd *iofd) @@ -753,7 +753,7 @@ } /* OSMO_IO_FD_MODE_RECVMSG_SENDMSG: Don't call this function after enabling read or write. */ - OSMO_ASSERT(!iofd->u.uring.write_enabled && !iofd->u.uring.read_enabled); + OSMO_ASSERT(!iofd->u.uring.write.enabled && !iofd->u.uring.read.enabled); /* Set flag to enable temporary osmo_fd during register() time: */ IOFD_FLAG_SET(iofd, IOFD_FLAG_NOTIFY_CONNECTED); -- To view, visit https://gerrit.osmocom.org/c/libosmocore/+/40856?usp=email To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings?usp=email Gerrit-MessageType: newchange Gerrit-Project: libosmocore Gerrit-Branch: master Gerrit-Change-Id: I3cf4ab6d9aebf5532ad174c90c7b0e9633491c88 Gerrit-Change-Number: 40856 Gerrit-PatchSet: 1 Gerrit-Owner: jolly <andr...@eversberg.eu>