Add the functions to support receiving packets.

Signed-off-by: Zhiyong Yang <zhiyong.y...@intel.com>
---
 drivers/net/vhostpci/vhostpci_ethdev.c | 311 +++++++++++++++++++++++++++++++++
 1 file changed, 311 insertions(+)

diff --git a/drivers/net/vhostpci/vhostpci_ethdev.c 
b/drivers/net/vhostpci/vhostpci_ethdev.c
index 0582f73b7..06e3f5c50 100644
--- a/drivers/net/vhostpci/vhostpci_ethdev.c
+++ b/drivers/net/vhostpci/vhostpci_ethdev.c
@@ -49,6 +49,10 @@
 #include "vhostpci_logs.h"
 #include "vhostpci_ethdev.h"
 
+#define MAX_BATCH_LEN 256
+#define VHOSTPCI_MAX_PKT_BURST 32
+#define VHOSTPCI_BUF_VECTOR_MAX 256
+
 static void
 vhostpci_dev_info_get(struct rte_eth_dev *dev,
                struct rte_eth_dev_info *dev_info);
@@ -92,6 +96,10 @@ vhostpci_dev_tx_queue_setup(struct rte_eth_dev *dev, 
uint16_t tx_queue_id,
 static int
 vhostpci_init_device(struct rte_eth_dev *eth_dev, uint64_t req_features);
 
+static uint16_t
+vhostpci_dequeue_burst(struct vhostpci_net *dev, uint16_t queue_id,
+       struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count);
+
 static int
 vhostpci_dev_start(struct rte_eth_dev *dev);
 
@@ -313,6 +321,308 @@ vhostpci_dev_tx_queue_setup(struct rte_eth_dev *dev, 
uint16_t tx_queue_id,
        return 0;
 }
 
+static __rte_always_inline void
+update_used_ring(struct vhostpci_virtqueue *vq,
+                uint32_t used_idx, uint32_t desc_idx)
+{
+       vq->used->ring[used_idx].id  = desc_idx;
+       vq->used->ring[used_idx].len = 0;
+}
+
+static __rte_always_inline int
+copy_desc_to_mbuf(struct vhostpci_net *dev, struct vhostpci_virtqueue *vq,
+                 struct vring_desc *descs, uint16_t max_desc,
+                 struct rte_mbuf *m, uint16_t desc_idx,
+                 struct rte_mempool *mbuf_pool)
+{
+       struct vring_desc *desc;
+       uint64_t desc_addr;
+       uint32_t desc_avail, desc_offset;
+       uint32_t mbuf_avail, mbuf_offset;
+       uint32_t cpy_len;
+       struct rte_mbuf *cur = m, *prev = m;
+       /* A counter to avoid desc dead loop chain */
+       uint32_t nr_desc = 1;
+       struct batch_copy_elem *batch_copy = vq->batch_copy_elems;
+       uint16_t copy_nb = vq->batch_copy_nb_elems;
+       int error = 0;
+
+       desc = &descs[desc_idx];
+       if (unlikely(desc->len < dev->vhost_hlen)) {
+               error = -1;
+               goto out;
+       }
+
+       desc_addr = remote_gpa_to_vva(dev, desc->addr);
+
+       if (unlikely(!desc_addr)) {
+               error = -1;
+               goto out;
+       }
+
+       /**
+        * A virtio driver normally uses at least 2 desc buffers
+        * for Tx: the first for storing the header, and others
+        * for storing the data.
+        */
+       if (likely((desc->len == dev->vhost_hlen) &&
+                  (desc->flags & VRING_DESC_F_NEXT) != 0)) {
+               desc = &descs[desc->next];
+               if (unlikely(desc->flags & VRING_DESC_F_INDIRECT)) {
+                       error = -1;
+                       goto out;
+               }
+
+               desc_addr = remote_gpa_to_vva(dev, desc->addr);
+               if (unlikely(!desc_addr)) {
+                       error = -1;
+                       goto out;
+               }
+
+               desc_offset = 0;
+               desc_avail  = desc->len;
+               nr_desc    += 1;
+       } else {
+               desc_avail  = desc->len - dev->vhost_hlen;
+               desc_offset = dev->vhost_hlen;
+       }
+
+       rte_prefetch0((void *)(uintptr_t)(desc_addr + desc_offset));
+
+       mbuf_offset = 0;
+       mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
+       while (1) {
+               cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+               if (likely(cpy_len > MAX_BATCH_LEN ||
+                          copy_nb >= vq->size ||
+                          (cur == m))) {
+                       rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
+                               mbuf_offset), (void *)((uintptr_t)(desc_addr +
+                               desc_offset)), cpy_len);
+               } else {
+                       batch_copy[copy_nb].dst =
+                               rte_pktmbuf_mtod_offset(cur, void *,
+                                                               mbuf_offset);
+                       batch_copy[copy_nb].src =
+                               (void *)((uintptr_t)(desc_addr +
+                                                    desc_offset));
+                       batch_copy[copy_nb].len = cpy_len;
+                       copy_nb++;
+               }
+
+               mbuf_avail  -= cpy_len;
+               mbuf_offset += cpy_len;
+               desc_avail  -= cpy_len;
+               desc_offset += cpy_len;
+
+               /* This desc reaches to its end, get the next one */
+               if (desc_avail == 0) {
+                       if ((desc->flags & VRING_DESC_F_NEXT) == 0)
+                               break;
+
+                       if (unlikely(desc->next >= max_desc ||
+                                    ++nr_desc > max_desc)) {
+                               error = -1;
+                               goto out;
+                       }
+                       desc = &descs[desc->next];
+                       if (unlikely(desc->flags & VRING_DESC_F_INDIRECT)) {
+                               error = -1;
+                               goto out;
+                       }
+
+                       desc_addr = remote_gpa_to_vva(dev, desc->addr);
+                       if (unlikely(!desc_addr)) {
+                               error = -1;
+                               goto out;
+                       }
+
+                       rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+                       desc_offset = 0;
+                       desc_avail  = desc->len;
+
+               }
+
+               /**
+                * This mbuf reaches to its end, get a new one
+                * to hold more data.
+                */
+               if (mbuf_avail == 0) {
+                       cur = rte_pktmbuf_alloc(mbuf_pool);
+                       if (unlikely(cur == NULL)) {
+                               error = -1;
+                               goto out;
+                       }
+
+                       prev->next = cur;
+                       prev->data_len = mbuf_offset;
+                       m->nb_segs += 1;
+                       m->pkt_len += mbuf_offset;
+                       prev = cur;
+
+                       mbuf_offset = 0;
+                       mbuf_avail  = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+               }
+       }
+
+       prev->data_len = mbuf_offset;
+       m->pkt_len    += mbuf_offset;
+
+out:
+       vq->batch_copy_nb_elems = copy_nb;
+
+       return error;
+}
+
+static inline void
+do_data_copy_dequeue(struct vhostpci_virtqueue *vq)
+{
+       struct batch_copy_elem *elem = vq->batch_copy_elems;
+       uint16_t count = vq->batch_copy_nb_elems;
+       int i;
+
+       for (i = 0; i < count; i++)
+               rte_memcpy(elem[i].dst, elem[i].src, elem[i].len);
+}
+
+static __rte_always_inline void
+update_used_idx(struct vhostpci_virtqueue *vq, uint32_t count)
+{
+       if (unlikely(count == 0))
+               return;
+
+       rte_smp_wmb();
+       rte_smp_rmb();
+
+       vq->used->idx += count;
+}
+
+static uint16_t
+vhostpci_dequeue_burst(struct vhostpci_net *dev, uint16_t queue_id,
+               struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts,
+               uint16_t count)
+{
+       struct vhostpci_virtqueue *vq;
+       uint32_t desc_indexes[VHOSTPCI_MAX_PKT_BURST];
+       uint32_t used_idx;
+       uint32_t i = 0;
+       uint16_t free_entries;
+       uint16_t avail_idx;
+
+       if (!dev)
+               return 0;
+
+       if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring)))
+               return 0;
+
+       vq = dev->virtqueue[queue_id];
+       if (unlikely(vq->enabled == 0))
+               return 0;
+
+       vq->batch_copy_nb_elems = 0;
+
+       free_entries = *((volatile uint16_t *)&vq->avail->idx) -
+                       vq->last_avail_idx;
+       if (free_entries == 0)
+               return 0;
+
+       /* Prefetch available and used ring */
+       avail_idx = vq->last_avail_idx & (vq->size - 1);
+       used_idx  = vq->last_used_idx  & (vq->size - 1);
+       rte_prefetch0(&vq->avail->ring[avail_idx]);
+       rte_prefetch0(&vq->used->ring[used_idx]);
+
+       count = RTE_MIN(count, VHOSTPCI_MAX_PKT_BURST);
+       count = RTE_MIN(count, free_entries);
+
+       /* Retrieve all of the head indexes first to avoid caching issues. */
+       for (i = 0; i < count; i++) {
+               avail_idx = (vq->last_avail_idx + i) & (vq->size - 1);
+               used_idx  = (vq->last_used_idx  + i) & (vq->size - 1);
+               desc_indexes[i] = vq->avail->ring[avail_idx];
+               update_used_ring(vq, used_idx, desc_indexes[i]);
+       }
+
+       /* Prefetch descriptor index. */
+       rte_prefetch0(&vq->desc[desc_indexes[0]]);
+       for (i = 0; i < count; i++) {
+               struct vring_desc *desc;
+               uint16_t sz, idx;
+               int err;
+
+               if (likely(i + 1 < count))
+                       rte_prefetch0(&vq->desc[desc_indexes[i + 1]]);
+
+               desc = vq->desc;
+               sz = vq->size;
+               idx = desc_indexes[i];
+
+               pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
+               if (unlikely(pkts[i] == NULL))
+                       break;
+
+               err = copy_desc_to_mbuf(dev, vq, desc, sz, pkts[i], idx,
+                                       mbuf_pool);
+               if (unlikely(err)) {
+                       rte_pktmbuf_free(pkts[i]);
+                       break;
+               }
+
+       }
+       vq->last_avail_idx += i;
+
+       do_data_copy_dequeue(vq);
+       vq->last_used_idx += i;
+       update_used_idx(vq, i);
+
+       return i;
+}
+
+static uint16_t
+eth_vhostpci_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
+{
+       struct vhostpci_queue *r = q;
+       uint16_t i, nb_rx = 0;
+       uint16_t nb_receive = nb_bufs;
+
+       if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+               return 0;
+
+       rte_atomic32_set(&r->while_queuing, 1);
+
+       if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+               goto out;
+
+       /* Dequeue packets from TX queue in the other guest */
+       while (nb_receive) {
+               uint16_t nb_pkts;
+               uint16_t num = (uint16_t)RTE_MIN(nb_receive,
+                                                VHOSTPCI_MAX_PKT_BURST);
+
+               nb_pkts = vhostpci_dequeue_burst(r->vpnet, r->virtqueue_id,
+                                                r->mb_pool, &bufs[nb_rx],
+                                                num);
+
+               nb_rx += nb_pkts;
+               nb_receive -= nb_pkts;
+               if (nb_pkts < num)
+                       break;
+       }
+
+       r->stats.pkts += nb_rx;
+
+       for (i = 0; likely(i < nb_rx); i++) {
+               bufs[i]->port = r->port_id;
+               r->stats.bytes += bufs[i]->pkt_len;
+       }
+
+out:
+       rte_atomic32_set(&r->while_queuing, 0);
+
+       return nb_rx;
+}
+
 static int
 vhostpci_dev_atomic_read_link_status(struct rte_eth_dev *dev,
                struct rte_eth_link *link)
@@ -716,6 +1026,7 @@ eth_vhostpci_dev_init(struct rte_eth_dev *eth_dev)
                rte_intr_callback_register(eth_dev->intr_handle,
                        vhostpci_interrupt_handler, eth_dev);
 
+       eth_dev->rx_pkt_burst = &eth_vhostpci_rx;
        return 0;
 }
 
-- 
2.13.3

Reply via email to