Implement packet receive and transmit using io_uring asynchronous I/O,
with full support for both single-segment and multi-segment mbufs.

Rx path:
  - rtap_rx_alloc() chains multiple mbufs when the MTU exceeds a
    single mbuf's tailroom capacity
  - Pre-post read/readv requests to the io_uring submission queue,
    each backed by a pre-allocated (possibly chained) mbuf
  - On rx_burst, harvest completed CQEs and replace each consumed
    mbuf with a freshly allocated one
  - rtap_rx_adjust() distributes received data across segments and
    frees unused trailing segments
  - Parse the prepended virtio-net header (offload fields are
    ignored until the offload patch)

Tx path:
  - For single-segment mbufs, use io_uring write and batch submits
  - For multi-segment mbufs, use writev via io_uring with immediate
    submit (iovec is stack-allocated)
  - When the mbuf headroom is not writable (shared or indirect),
    chain a new header mbuf for the virtio-net header
  - Prepend a zeroed virtio-net header (offload population deferred)
  - Clean completed tx CQEs to free transmitted mbufs

Add io_uring cancel-all logic using IORING_ASYNC_CANCEL_ALL for
clean queue teardown, draining all pending CQEs and freeing mbufs.

Signed-off-by: Stephen Hemminger <[email protected]>
---
 doc/guides/nics/features/rtap.ini |   1 +
 drivers/net/rtap/meson.build      |   1 +
 drivers/net/rtap/rtap.h           |  13 +
 drivers/net/rtap/rtap_ethdev.c    |   7 +
 drivers/net/rtap/rtap_rxtx.c      | 755 ++++++++++++++++++++++++++++++
 5 files changed, 777 insertions(+)
 create mode 100644 drivers/net/rtap/rtap_rxtx.c

diff --git a/doc/guides/nics/features/rtap.ini 
b/doc/guides/nics/features/rtap.ini
index ed7c638029..c064e1e0b9 100644
--- a/doc/guides/nics/features/rtap.ini
+++ b/doc/guides/nics/features/rtap.ini
@@ -4,6 +4,7 @@
 ; Refer to default.ini for the full list of available PMD features.
 ;
 [Features]
+Scattered Rx         = P
 Linux                = Y
 ARMv7                = Y
 ARMv8                = Y
diff --git a/drivers/net/rtap/meson.build b/drivers/net/rtap/meson.build
index 7bd7806ef3..8e2b15f382 100644
--- a/drivers/net/rtap/meson.build
+++ b/drivers/net/rtap/meson.build
@@ -19,6 +19,7 @@ endif
 
 sources = files(
         'rtap_ethdev.c',
+        'rtap_rxtx.c',
 )
 
 ext_deps += liburing
diff --git a/drivers/net/rtap/rtap.h b/drivers/net/rtap/rtap.h
index 39a3188a7b..a0bbb1a8a0 100644
--- a/drivers/net/rtap/rtap.h
+++ b/drivers/net/rtap/rtap.h
@@ -70,4 +70,17 @@ struct rtap_pmd {
 int rtap_queue_open(struct rte_eth_dev *dev, uint16_t queue_id);
 void rtap_queue_close(struct rte_eth_dev *dev, uint16_t queue_id);
 
+/* rtap_rxtx.c */
+uint16_t rtap_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts);
+uint16_t rtap_tx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts);
+int rtap_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id,
+                       uint16_t nb_rx_desc, unsigned int socket_id,
+                       const struct rte_eth_rxconf *rx_conf,
+                       struct rte_mempool *mb_pool);
+void rtap_rx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id);
+int rtap_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id,
+                       uint16_t nb_tx_desc, unsigned int socket_id,
+                       const struct rte_eth_txconf *tx_conf);
+void rtap_tx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id);
+
 #endif /* _RTAP_H_ */
diff --git a/drivers/net/rtap/rtap_ethdev.c b/drivers/net/rtap/rtap_ethdev.c
index 4e7847ff8d..a65a8b77ad 100644
--- a/drivers/net/rtap/rtap_ethdev.c
+++ b/drivers/net/rtap/rtap_ethdev.c
@@ -232,6 +232,10 @@ static const struct eth_dev_ops rtap_ops = {
        .dev_stop               = rtap_dev_stop,
        .dev_configure          = rtap_dev_configure,
        .dev_close              = rtap_dev_close,
+       .rx_queue_setup         = rtap_rx_queue_setup,
+       .rx_queue_release       = rtap_rx_queue_release,
+       .tx_queue_setup         = rtap_tx_queue_setup,
+       .tx_queue_release       = rtap_tx_queue_release,
 };
 
 static int
@@ -272,6 +276,9 @@ rtap_create(struct rte_eth_dev *dev, const char *tap_name, 
uint8_t persist)
 
        PMD_LOG(DEBUG, "%s setup", ifr.ifr_name);
 
+       dev->rx_pkt_burst = rtap_rx_burst;
+       dev->tx_pkt_burst = rtap_tx_burst;
+
        return 0;
 
 error:
diff --git a/drivers/net/rtap/rtap_rxtx.c b/drivers/net/rtap/rtap_rxtx.c
new file mode 100644
index 0000000000..c972ab4ca0
--- /dev/null
+++ b/drivers/net/rtap/rtap_rxtx.c
@@ -0,0 +1,755 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2026 Stephen Hemminger
+ */
+
+#include <assert.h>
+#include <errno.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <liburing.h>
+#include <sys/uio.h>
+#include <linux/virtio_net.h>
+
+#include <rte_common.h>
+#include <rte_ethdev.h>
+#include <rte_ip.h>
+#include <rte_mbuf.h>
+#include <rte_net.h>
+#include <rte_malloc.h>
+
+#include "rtap.h"
+
+/*
+ * Since virtio net header is prepended to the mbuf,
+ * the DPDK configuration should make sure that mbuf pools
+ * are created to work.
+ */
+static_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct virtio_net_hdr),
+             "Pktmbuf headroom not big enough for virtio header");
+
+
+/* Get the per-process file descriptor used transmit and receive */
+static inline int
+rtap_queue_fd(uint16_t port_id, uint16_t queue_id)
+{
+       struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+       int *fds = dev->process_private;
+       int fd = fds[queue_id];
+
+       RTE_ASSERT(fd != -1);
+       return fd;
+}
+
+/*
+ * Add to submit queue a read of mbuf data.
+ * For multi-segment mbuf's requires readv().
+ * Return:
+ *   -ENOSPC : no submit queue element available.
+ *   1 : readv was used and no io_uring_submit was done.
+ *   0 : regular read submitted, caller should call io_uring_submit
+ *       later to batch.
+ */
+static inline int
+rtap_rx_submit(struct rtap_rx_queue *rxq, int fd, struct rte_mbuf *mb)
+{
+       struct io_uring_sqe *sqe = io_uring_get_sqe(&rxq->io_ring);
+       struct iovec iovs[RTE_MBUF_MAX_NB_SEGS];
+
+       if (unlikely(sqe == NULL))
+               return -ENOSPC;
+
+       io_uring_sqe_set_data(sqe, mb);
+
+       RTE_ASSERT(rte_pktmbuf_headroom(mb) >= sizeof(struct virtio_net_hdr));
+       void *buf = rte_pktmbuf_mtod_offset(mb, void *, -sizeof(struct 
virtio_net_hdr));
+       unsigned int nbytes = sizeof(struct virtio_net_hdr) + 
rte_pktmbuf_tailroom(mb);
+
+       /* optimize for the case where packet fits in one mbuf */
+       if (mb->nb_segs == 1) {
+               io_uring_prep_read(sqe, fd, buf, nbytes, 0);
+               /* caller will submit as batch */
+               return 0;
+       } else {
+               uint16_t nsegs = mb->nb_segs;
+               RTE_ASSERT(nsegs > 0 && nsegs < IOV_MAX);
+
+               iovs[0].iov_base = buf;
+               iovs[0].iov_len = nbytes;
+
+               for (uint16_t i = 1; i < nsegs; i++) {
+                       mb = mb->next;
+                       iovs[i].iov_base = rte_pktmbuf_mtod(mb, void *);
+                       iovs[i].iov_len = rte_pktmbuf_tailroom(mb);
+               }
+               io_uring_prep_readv(sqe, fd, iovs, nsegs, 0);
+
+               /*
+                * For readv, need to submit now since iovs[] must be
+                * valid until submitted.
+                * io_uring_submit(3) returns the number of submitted submission
+                *  queue entries (on failure returns -errno).
+                */
+               return io_uring_submit(&rxq->io_ring);
+       }
+}
+
+/* Allocates one or more mbuf's to be used for reading packets */
+static struct rte_mbuf *
+rtap_rx_alloc(struct rtap_rx_queue *rxq)
+{
+       const struct rte_eth_dev *dev = &rte_eth_devices[rxq->port_id];
+       int buf_size = dev->data->mtu + RTE_ETHER_HDR_LEN;
+       struct rte_mbuf *m = NULL;
+       struct rte_mbuf **tail = &m;
+
+       do {
+               struct rte_mbuf *seg = rte_pktmbuf_alloc(rxq->mb_pool);
+               if (unlikely(seg == NULL)) {
+                       rte_pktmbuf_free(m);
+                       return NULL;
+               }
+               *tail = seg;
+               tail = &seg->next;
+               if (seg != m)
+                       ++m->nb_segs;
+
+               buf_size -= rte_pktmbuf_tailroom(seg);
+       } while (buf_size > 0);
+
+       __rte_mbuf_sanity_check(m, 1);
+       return m;
+}
+
+/*
+ * When receiving multi-segment mbuf's need to adjust
+ * the length of mbufs.
+ */
+static inline int
+rtap_rx_adjust(struct rte_mbuf *mb, uint32_t len)
+{
+       struct rte_mbuf *seg;
+       uint16_t count = 0;
+
+       mb->pkt_len = len;
+
+       /* Walk through mbuf chain and update the length of each segment */
+       for (seg = mb; seg != NULL && len > 0; seg = seg->next) {
+               uint16_t seg_len = RTE_MIN(len, rte_pktmbuf_tailroom(seg));
+
+               seg->data_len = seg_len;
+               count++;
+               len -= seg_len;
+
+               /* If length is zero, this is end of packet */
+               if (len == 0) {
+                       /* Drop unused tail segments */
+                       if (seg->next != NULL) {
+                               struct rte_mbuf *tail = seg->next;
+                               seg->next = NULL;
+
+                               /* Free segments one by one to avoid nb_segs 
issues */
+                               while (tail != NULL) {
+                                       struct rte_mbuf *next = tail->next;
+                                       rte_pktmbuf_free_seg(tail);
+                                       tail = next;
+                               }
+                       }
+
+                       mb->nb_segs = count;
+                       return 0;
+               }
+       }
+
+       /* Packet was truncated - not enough mbuf space */
+       return -1;
+}
+
+/*
+ * Set the receive offload flags of received mbuf
+ * based on the bits in the virtio network header
+ */
+static int
+rtap_rx_offload(struct rte_mbuf *m, const struct virtio_net_hdr *hdr)
+{
+       uint32_t ptype;
+       bool l4_supported = false;
+       struct rte_net_hdr_lens hdr_lens;
+
+       /* nothing to do */
+       if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE)
+               return 0;
+
+       m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN;
+
+       ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK);
+       m->packet_type = ptype;
+       if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP ||
+           (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP ||
+           (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP)
+               l4_supported = true;
+
+       if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
+               uint32_t hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + 
hdr_lens.l4_len;
+               if (hdr->csum_start <= hdrlen && l4_supported) {
+                       m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE;
+               } else {
+                       /* Unknown proto or tunnel, do sw cksum. */
+                       uint16_t csum = 0;
+
+                       if (rte_raw_cksum_mbuf(m, hdr->csum_start,
+                                              rte_pktmbuf_pkt_len(m) - 
hdr->csum_start,
+                                              &csum) < 0)
+                               return -EINVAL;
+                       if (likely(csum != 0xffff))
+                               csum = ~csum;
+
+                       uint32_t off = (uint32_t)hdr->csum_offset + 
hdr->csum_start;
+                       if (rte_pktmbuf_data_len(m) >= off + sizeof(uint16_t))
+                               *rte_pktmbuf_mtod_offset(m, uint16_t *, off) = 
csum;
+               }
+       } else if ((hdr->flags & VIRTIO_NET_HDR_F_DATA_VALID) && l4_supported) {
+               m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD;
+       }
+
+       /* GSO request, save required information in mbuf */
+       if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) {
+               /* Check unsupported modes */
+               if ((hdr->gso_type & VIRTIO_NET_HDR_GSO_ECN) || hdr->gso_size 
== 0)
+                       return -EINVAL;
+
+               /* Update mss lengths in mbuf */
+               m->tso_segsz = hdr->gso_size;
+               switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
+               case VIRTIO_NET_HDR_GSO_TCPV4:
+               case VIRTIO_NET_HDR_GSO_TCPV6:
+                       m->ol_flags |= RTE_MBUF_F_RX_LRO | 
RTE_MBUF_F_RX_L4_CKSUM_NONE;
+                       break;
+               default:
+                       return -EINVAL;
+               }
+       }
+
+       return 0;
+}
+
+uint16_t
+rtap_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+       struct rtap_rx_queue *rxq = queue;
+       struct io_uring_cqe *cqe;
+       unsigned int head, num_cqe = 0, num_sqe = 0;
+       uint16_t num_rx = 0;
+       uint32_t num_bytes = 0;
+       int fd = rtap_queue_fd(rxq->port_id, rxq->queue_id);
+
+       if (unlikely(nb_pkts == 0))
+               return 0;
+
+       io_uring_for_each_cqe(&rxq->io_ring, head, cqe) {
+               struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data;
+               struct rte_mbuf *nmb = NULL;
+               struct virtio_net_hdr *hdr = NULL;
+               ssize_t len = cqe->res;
+
+               PMD_RX_LOG(DEBUG, "complete m=%p len=%zd", mb, len);
+
+               num_cqe++;
+
+               if (unlikely(len < (ssize_t)(sizeof(*hdr) + 
RTE_ETHER_HDR_LEN))) {
+                       if (len < 0)
+                               PMD_RX_LOG(ERR, "io_uring_read: %s", 
strerror(-len));
+                       else
+                               PMD_RX_LOG(ERR, "io_uring_read len %zd", len);
+                       rxq->rx_errors++;
+                       goto resubmit;
+               }
+
+               /* virtio header is before packet data */
+               hdr = rte_pktmbuf_mtod_offset(mb, struct virtio_net_hdr *, 
-sizeof(*hdr));
+               len -= sizeof(*hdr);
+
+               /* Replacement mbuf for resubmitting */
+               nmb = rtap_rx_alloc(rxq);
+               if (unlikely(nmb == NULL)) {
+                       struct rte_eth_dev *dev = 
&rte_eth_devices[rxq->port_id];
+
+                       PMD_RX_LOG(ERR, "Rx mbuf alloc failed");
+                       dev->data->rx_mbuf_alloc_failed++;
+
+                       nmb = mb;        /* Reuse original */
+                       goto resubmit;
+               }
+
+               if (mb->nb_segs == 1) {
+                       mb->data_len = len;
+                       mb->pkt_len = len;
+               } else {
+                       if (unlikely(rtap_rx_adjust(mb, len) < 0)) {
+                               PMD_RX_LOG(ERR, "packet truncated: pkt_len=%u 
exceeds mbuf capacity",
+                                          mb->pkt_len);
+                               ++rxq->rx_errors;
+                               rte_pktmbuf_free(mb);
+                               goto resubmit;
+                       }
+               }
+
+               if (unlikely(rtap_rx_offload(mb, hdr) < 0)) {
+                       PMD_RX_LOG(ERR, "invalid rx offload");
+                       ++rxq->rx_errors;
+                       rte_pktmbuf_free(mb);
+                       goto resubmit;
+               }
+
+               mb->port = rxq->port_id;
+
+               __rte_mbuf_sanity_check(mb, 1);
+               num_bytes += mb->pkt_len;
+               bufs[num_rx++] = mb;
+
+resubmit:
+               /* Submit the replacement mbuf */
+               int n = rtap_rx_submit(rxq, fd, nmb);
+               if (unlikely(n < 0)) {
+                       /* Hope that later Rx can recover */
+                       PMD_RX_LOG(ERR, "io_uring no Rx sqe: %s", strerror(-n));
+                       rxq->rx_errors++;
+                       rte_pktmbuf_free(nmb);
+                       break;
+               }
+
+               /* If using readv() then n > 0 and all sqe's have been queued. 
*/
+               if (n > 0)
+                       num_sqe = 0;
+               else
+                       ++num_sqe;
+
+               if (num_rx == nb_pkts)
+                       break;
+       }
+       if (num_cqe > 0)
+               io_uring_cq_advance(&rxq->io_ring, num_cqe);
+
+       if (num_sqe > 0) {
+               int n = io_uring_submit(&rxq->io_ring);
+               if (unlikely(n < 0)) {
+                       PMD_LOG(ERR, "Rx io_uring submit failed: %s", 
strerror(-n));
+               } else if (unlikely(n != (int)num_sqe)) {
+                       PMD_RX_LOG(NOTICE, "Rx io_uring %d of %u resubmitted", 
n, num_sqe);
+               }
+       }
+
+       rxq->rx_packets += num_rx;
+       rxq->rx_bytes += num_bytes;
+
+       return num_rx;
+}
+
+int
+rtap_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, uint16_t 
nb_rx_desc,
+                   unsigned int socket_id,
+                   const struct rte_eth_rxconf *rx_conf __rte_unused,
+                   struct rte_mempool *mb_pool)
+{
+       struct rte_mbuf **mbufs = NULL;
+       unsigned int nsqe = 0;
+       int fd = -1;
+
+       PMD_LOG(DEBUG, "setup port %u queue %u rx_descriptors %u",
+               dev->data->port_id, queue_id, nb_rx_desc);
+
+       struct rtap_rx_queue *rxq = rte_zmalloc_socket(NULL, sizeof(*rxq),
+                                                      RTE_CACHE_LINE_SIZE, 
socket_id);
+       if (rxq == NULL) {
+               PMD_LOG(ERR, "rxq alloc failed");
+               return -1;
+       }
+
+       rxq->mb_pool = mb_pool;
+       rxq->port_id = dev->data->port_id;
+       rxq->queue_id = queue_id;
+       dev->data->rx_queues[queue_id] = rxq;
+
+       if (io_uring_queue_init(nb_rx_desc, &rxq->io_ring, 0) != 0) {
+               PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno));
+               goto error_rxq_free;
+       }
+
+       mbufs = calloc(nb_rx_desc, sizeof(struct rte_mbuf *));
+       if (mbufs == NULL) {
+               PMD_LOG(ERR, "Rx mbuf pointer alloc failed");
+               goto error_iouring_exit;
+       }
+
+       /* open shared tap fd maybe already setup */
+       if (rtap_queue_open(dev, queue_id) < 0)
+               goto error_bulk_free;
+
+       fd = rtap_queue_fd(rxq->port_id, rxq->queue_id);
+
+       for (uint16_t i = 0; i < nb_rx_desc; i++) {
+               mbufs[i] = rtap_rx_alloc(rxq);
+               if (mbufs[i] == NULL) {
+                       PMD_LOG(ERR, "Rx mbuf alloc buf failed");
+                       goto error_bulk_free;
+               }
+
+               int n = rtap_rx_submit(rxq, fd, mbufs[i]);
+               if (n < 0) {
+                       PMD_LOG(ERR, "rtap_rx_submit failed: %s", strerror(-n));
+                       goto error_bulk_free;
+               }
+
+               /* If using readv() then n > 0 and all sqe's have been queued. 
*/
+               if (n > 0)
+                       nsqe = 0;
+               else
+                       ++nsqe;
+       }
+
+       if (nsqe > 0) {
+               int n = io_uring_submit(&rxq->io_ring);
+               if (n < 0) {
+                       PMD_LOG(ERR, "Rx io_uring submit failed: %s", 
strerror(-n));
+                       goto error_bulk_free;
+               }
+               if (n < (int)nsqe)
+                       PMD_LOG(NOTICE, "Rx io_uring partial submit %d of %u", 
n, nb_rx_desc);
+       }
+
+       free(mbufs);
+       return 0;
+
+error_bulk_free:
+       /* can't use bulk free here because some of mbufs[] maybe NULL */
+       for (uint16_t i = 0; i < nb_rx_desc; i++) {
+               if (mbufs[i] != NULL)
+                       rte_pktmbuf_free(mbufs[i]);
+       }
+       rtap_queue_close(dev, queue_id);
+       free(mbufs);
+error_iouring_exit:
+       io_uring_queue_exit(&rxq->io_ring);
+error_rxq_free:
+       rte_free(rxq);
+       return -1;
+}
+
+/*
+ * Cancel all pending io_uring operations and drain completions.
+ * Uses IORING_ASYNC_CANCEL_ALL to cancel all operations at once.
+ * Returns the number of mbufs freed.
+ */
+static unsigned int
+rtap_cancel_all(struct io_uring *ring)
+{
+       struct io_uring_cqe *cqe;
+       struct io_uring_sqe *sqe;
+       unsigned int head, num_freed = 0;
+       unsigned int ready;
+       int ret;
+
+       /* Cancel all pending operations using CANCEL_ALL flag */
+       sqe = io_uring_get_sqe(ring);
+       if (sqe != NULL) {
+               /* IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_ANY cancels 
all ops */
+               io_uring_prep_cancel(sqe, NULL,
+                                    IORING_ASYNC_CANCEL_ALL | 
IORING_ASYNC_CANCEL_ANY);
+               io_uring_sqe_set_data(sqe, NULL);
+               ret = io_uring_submit(ring);
+               if (ret < 0)
+                       PMD_LOG(ERR, "cancel submit failed: %s", 
strerror(-ret));
+       }
+
+       /*
+        * One blocking wait to let the kernel deliver the cancel CQE
+        * and the CQEs for all cancelled operations.
+        */
+       io_uring_submit_and_wait(ring, 1);
+
+       /*
+        * Drain all CQEs non-blocking.  Cancellation of many pending
+        * operations may produce CQEs in waves; keep polling until the
+        * CQ is empty.
+        */
+       for (unsigned int retries = 0; retries < 10; retries++) {
+               ready = io_uring_cq_ready(ring);
+               if (ready == 0)
+                       break;
+
+               io_uring_for_each_cqe(ring, head, cqe) {
+                       struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data;
+
+                       /* Skip the cancel operation's own CQE (user_data = 
NULL) */
+                       if (mb != NULL) {
+                               rte_pktmbuf_free(mb);
+                               ++num_freed;
+                       }
+               }
+
+               /* Advance past all processed CQEs */
+               io_uring_cq_advance(ring, ready);
+       }
+
+       return num_freed;
+}
+
+void
+rtap_rx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id)
+{
+       struct rtap_rx_queue *rxq = dev->data->rx_queues[queue_id];
+
+       if (rxq == NULL)
+               return;
+
+       rtap_cancel_all(&rxq->io_ring);
+       io_uring_queue_exit(&rxq->io_ring);
+
+       rte_free(rxq);
+
+       /* Close the shared TAP fd if the tx queue is already gone */
+       if (queue_id >= dev->data->nb_tx_queues ||
+           dev->data->tx_queues[queue_id] == NULL)
+               rtap_queue_close(dev, queue_id);
+}
+
+int
+rtap_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id,
+                   uint16_t nb_tx_desc, unsigned int socket_id,
+                   const struct rte_eth_txconf *tx_conf)
+{
+       /* open shared tap fd maybe already setup */
+       if (rtap_queue_open(dev, queue_id) < 0)
+               return -1;
+
+       struct rtap_tx_queue *txq = rte_zmalloc_socket(NULL, sizeof(*txq),
+                                                      RTE_CACHE_LINE_SIZE, 
socket_id);
+       if (txq == NULL) {
+               PMD_LOG(ERR, "txq alloc failed");
+               return -1;
+       }
+
+       txq->port_id = dev->data->port_id;
+       txq->queue_id = queue_id;
+       txq->free_thresh = tx_conf->tx_free_thresh;
+       dev->data->tx_queues[queue_id] = txq;
+
+       if (io_uring_queue_init(nb_tx_desc, &txq->io_ring, 0) != 0) {
+               PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno));
+               rte_free(txq);
+               return -1;
+       }
+
+       return 0;
+}
+
+static void
+rtap_tx_cleanup(struct rtap_tx_queue *txq)
+{
+       struct io_uring_cqe *cqe;
+       unsigned int head;
+       unsigned int num_cqe = 0;
+
+       io_uring_for_each_cqe(&txq->io_ring, head, cqe) {
+               struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data;
+
+               ++num_cqe;
+
+               /* Skip CQEs with NULL user_data (e.g., cancel operations) */
+               if (mb == NULL)
+                       continue;
+
+               PMD_TX_LOG(DEBUG, " mbuf len %u result: %d", mb->pkt_len, 
cqe->res);
+               txq->tx_errors += (cqe->res < 0);
+               rte_pktmbuf_free(mb);
+       }
+       io_uring_cq_advance(&txq->io_ring, num_cqe);
+}
+
+void
+rtap_tx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id)
+{
+       struct rtap_tx_queue *txq = dev->data->tx_queues[queue_id];
+
+       if (txq == NULL)
+               return;
+
+       /* First drain any completed TX operations */
+       rtap_tx_cleanup(txq);
+
+       /* Cancel all remaining pending operations and free mbufs */
+       rtap_cancel_all(&txq->io_ring);
+       io_uring_queue_exit(&txq->io_ring);
+
+       rte_free(txq);
+
+       /* Close the shared TAP fd if the rx queue is already gone */
+       if (queue_id >= dev->data->nb_rx_queues ||
+           dev->data->rx_queues[queue_id] == NULL)
+               rtap_queue_close(dev, queue_id);
+}
+
+/* Convert mbuf offload flags to virtio net header */
+static void
+rtap_tx_offload(struct virtio_net_hdr *hdr, const struct rte_mbuf *m)
+{
+       uint64_t csum_l4 = m->ol_flags & RTE_MBUF_F_TX_L4_MASK;
+       uint16_t o_l23_len = (m->ol_flags & RTE_MBUF_F_TX_TUNNEL_MASK) ?
+                            m->outer_l2_len + m->outer_l3_len : 0;
+
+       memset(hdr, 0, sizeof(*hdr));
+
+       if (m->ol_flags & RTE_MBUF_F_TX_TCP_SEG)
+               csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM;
+
+       switch (csum_l4) {
+       case RTE_MBUF_F_TX_UDP_CKSUM:
+               hdr->csum_start = o_l23_len + m->l2_len + m->l3_len;
+               hdr->csum_offset = offsetof(struct rte_udp_hdr, dgram_cksum);
+               hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
+               break;
+
+       case RTE_MBUF_F_TX_TCP_CKSUM:
+               hdr->csum_start = o_l23_len + m->l2_len + m->l3_len;
+               hdr->csum_offset = offsetof(struct rte_tcp_hdr, cksum);
+               hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
+               break;
+       }
+
+       /* TCP Segmentation Offload */
+       if (m->ol_flags & RTE_MBUF_F_TX_TCP_SEG) {
+               hdr->gso_type = (m->ol_flags & RTE_MBUF_F_TX_IPV6) ?
+                       VIRTIO_NET_HDR_GSO_TCPV6 :
+                       VIRTIO_NET_HDR_GSO_TCPV4;
+               hdr->gso_size = m->tso_segsz;
+               hdr->hdr_len = o_l23_len + m->l2_len + m->l3_len + m->l4_len;
+       }
+}
+
+/*
+ * Transmit burst posts mbufs into the io_uring TAP file descriptor
+ * by creating queue elements with write operation.
+ *
+ * The driver mimics the behavior of a real hardware NIC.
+ *
+ * If there is no space left in the io_uring then the driver will return the 
number of
+ * mbuf's that were processed to that point. The application can then decide 
to retry
+ * later or drop the unsent packets in case of backpressue.
+ *
+ * The transmit process puts the virtio header before the data. In some cases, 
a new mbuf
+ * is required from same pool as original; but if that fails, the packet is 
not sent and
+ * is silently dropped. This is to avoid situation where pool is so small that 
transmit
+ * gets stuck when pool resources are very low.
+ */
+uint16_t
+rtap_tx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+       struct rtap_tx_queue *txq = queue;
+       uint16_t i, num_tx = 0;
+       uint32_t num_tx_bytes = 0;
+
+       PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+       if (io_uring_sq_space_left(&txq->io_ring) < RTE_MAX(txq->free_thresh, 
nb_pkts))
+               rtap_tx_cleanup(txq);
+
+       int fd = rtap_queue_fd(txq->port_id, txq->queue_id);
+
+       for (i = 0; i < nb_pkts; i++) {
+               struct rte_mbuf *mb = bufs[i];
+               struct virtio_net_hdr *hdr;
+
+               /* Use packet head room space for virtio header (if possible) */
+               if (rte_mbuf_refcnt_read(mb) == 1 && RTE_MBUF_DIRECT(mb) &&
+                   rte_pktmbuf_headroom(mb) >= sizeof(*hdr)) {
+                       hdr = rte_pktmbuf_mtod_offset(mb, struct virtio_net_hdr 
*, -sizeof(*hdr));
+               } else {
+                       /* Need to chain a new mbuf to make room for virtio 
header */
+                       struct rte_mbuf *mh = rte_pktmbuf_alloc(mb->pool);
+                       if (unlikely(mh == NULL)) {
+                               PMD_TX_LOG(DEBUG, "mbuf pool exhausted on 
transmit");
+                               rte_pktmbuf_free(mb);
+                               ++txq->tx_errors;
+                               continue;
+                       }
+
+                       /* The packet headroom should be available in newly 
allocated mbuf */
+                       RTE_ASSERT(rte_pktmbuf_headroom(mh) >= sizeof(*hdr));
+
+                       hdr = rte_pktmbuf_mtod_offset(mh, struct virtio_net_hdr 
*, -sizeof(*hdr));
+                       mh->next = mb;
+                       mh->nb_segs = mb->nb_segs + 1;
+                       mh->pkt_len = mb->pkt_len;
+                       mh->ol_flags = mb->ol_flags & 
RTE_MBUF_F_TX_OFFLOAD_MASK;
+                       mb = mh;
+               }
+
+               struct io_uring_sqe *sqe = io_uring_get_sqe(&txq->io_ring);
+               if (sqe == NULL) {
+                       /* Drop header mbuf if it was used */
+                       if (mb != bufs[i])
+                               rte_pktmbuf_free_seg(mb);
+                       break;  /* submit ring is full */
+               }
+
+               /* Note: transmit bytes does not include virtio header */
+               num_tx_bytes += mb->pkt_len;
+
+               io_uring_sqe_set_data(sqe, mb);
+               rtap_tx_offload(hdr, mb);
+
+               PMD_TX_LOG(DEBUG, "write m=%p segs=%u", mb, mb->nb_segs);
+
+               /* Start of data written to kernel includes virtio net header */
+               void *buf = rte_pktmbuf_mtod_offset(mb, void *, -sizeof(*hdr));
+               unsigned int nbytes = sizeof(struct virtio_net_hdr) + 
mb->data_len;
+
+               if (mb->nb_segs == 1) {
+                       /* Single segment mbuf can go as write and batched */
+                       io_uring_prep_write(sqe, fd, buf, nbytes, 0);
+                       ++num_tx;
+               } else {
+                       /* Mult-segment mbuf needs scatter/gather */
+                       struct iovec iovs[RTE_MBUF_MAX_NB_SEGS + 1];
+                       unsigned int niov = mb->nb_segs;
+
+                       iovs[0].iov_base = buf;
+                       iovs[0].iov_len = nbytes;
+
+                       for (unsigned int v = 1; v < niov; v++) {
+                               mb = mb->next;
+                               iovs[v].iov_base = rte_pktmbuf_mtod(mb, void *);
+                               iovs[v].iov_len = mb->data_len;
+                       }
+
+                       io_uring_prep_writev(sqe, fd, iovs, niov, 0);
+
+                       /*
+                        * For writev, submit now since iovs[] is on the stack
+                        * and must remain valid until submitted.
+                        * This also submits any previously batched single-seg 
writes.
+                        */
+                       int err = io_uring_submit(&txq->io_ring);
+                       if (unlikely(err < 0)) {
+                               PMD_TX_LOG(ERR, "Tx io_uring submit failed: 
%s", strerror(-err));
+                               ++txq->tx_errors;
+                       }
+
+                       num_tx = 0;
+               }
+       }
+
+       if (likely(num_tx > 0)) {
+               int err = io_uring_submit(&txq->io_ring);
+               if (unlikely(err < 0)) {
+                       PMD_LOG(ERR, "Tx io_uring submit failed: %s", 
strerror(-err));
+                       ++txq->tx_errors;
+               }
+       }
+
+       txq->tx_packets += i;
+       txq->tx_bytes += num_tx_bytes;
+
+       return i;
+}
-- 
2.51.0

Reply via email to