Thanks, Gavin. My comments are inline. > -----Original Message----- > From: Gavin Hu (Arm Technology China) [mailto:gavin...@arm.com] > Sent: Monday, September 23, 2019 7:09 PM > To: Liu, Yong <yong....@intel.com>; maxime.coque...@redhat.com; Bie, Tiwei > <tiwei....@intel.com>; Wang, Zhihong <zhihong.w...@intel.com> > Cc: dev@dpdk.org; nd <n...@arm.com> > Subject: RE: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function > for packed ring > > Hi Marvin, > > Is it possible to vectorize the processing? > Other comments inline: > /Gavin
Gavin, According to our experiment, only vectorize some parts in [ed]nqueue function can't benefit performance. Functions like vhost_iova_to_vva and virtio_enqueue_offload can't be easily vectorized as they are full of judgment conditions. Thanks, Marvin > > -----Original Message----- > > From: dev <dev-boun...@dpdk.org> On Behalf Of Marvin Liu > > Sent: Friday, September 20, 2019 12:37 AM > > To: maxime.coque...@redhat.com; tiwei....@intel.com; > > zhihong.w...@intel.com > > Cc: dev@dpdk.org; Marvin Liu <yong....@intel.com> > > Subject: [dpdk-dev] [PATCH v2 03/16] vhost: add burst enqueue function > for > > packed ring > > > > Burst enqueue function will first check whether descriptors are cache > > aligned. It will also check prerequisites in the beginning. Burst > > enqueue function not support chained mbufs, single packet enqueue > > function will handle it. > > > > Signed-off-by: Marvin Liu <yong....@intel.com> > > > > diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h > > index 5074226f0..67889c80a 100644 > > --- a/lib/librte_vhost/vhost.h > > +++ b/lib/librte_vhost/vhost.h > > @@ -39,6 +39,9 @@ > > > > #define VHOST_LOG_CACHE_NR 32 > > > > +#define PACKED_DESCS_BURST (RTE_CACHE_LINE_SIZE / \ > > + sizeof(struct vring_packed_desc)) > > + > > #ifdef SUPPORT_GCC_UNROLL_PRAGMA > > #define PRAGMA_PARAM "GCC unroll 4" > > #endif > > @@ -57,6 +60,8 @@ > > #define UNROLL_PRAGMA(param) do {} while(0); > > #endif > > > > +#define PACKED_BURST_MASK (PACKED_DESCS_BURST - 1) > > + > > /** > > * Structure contains buffer address, length and descriptor index > > * from vring to do scatter RX. > > diff --git a/lib/librte_vhost/virtio_net.c > b/lib/librte_vhost/virtio_net.c > > index 2b5c47145..c664b27c5 100644 > > --- a/lib/librte_vhost/virtio_net.c > > +++ b/lib/librte_vhost/virtio_net.c > > @@ -895,6 +895,84 @@ virtio_dev_rx_split(struct virtio_net *dev, struct > > vhost_virtqueue *vq, > > return pkt_idx; > > } > > > > +static __rte_unused __rte_always_inline int > I remember "__rte_always_inline" should start at the first and separate > line, otherwise you will get a style issue. > /Gavin > > +virtio_dev_rx_burst_packed(struct virtio_net *dev, struct > vhost_virtqueue > > *vq, > > + struct rte_mbuf **pkts) > > +{ > > + bool wrap_counter = vq->avail_wrap_counter; > > + struct vring_packed_desc *descs = vq->desc_packed; > > + uint16_t avail_idx = vq->last_avail_idx; > > + > > + uint64_t desc_addrs[PACKED_DESCS_BURST]; > > + struct virtio_net_hdr_mrg_rxbuf *hdrs[PACKED_DESCS_BURST]; > > + uint32_t buf_offset = dev->vhost_hlen; > > + uint64_t lens[PACKED_DESCS_BURST]; > > + > > + uint16_t i; > > + > > + if (unlikely(avail_idx & PACKED_BURST_MASK)) > > + return -1; > > + > > + UNROLL_PRAGMA(PRAGMA_PARAM) > > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > > + if (unlikely(pkts[i]->next != NULL)) > > + return -1; > > + if (unlikely(!desc_is_avail(&descs[avail_idx + i], > > + wrap_counter))) > > + return -1; > > + } > > + > > + rte_smp_rmb(); > > + > > + UNROLL_PRAGMA(PRAGMA_PARAM) > > + for (i = 0; i < PACKED_DESCS_BURST; i++) > > + lens[i] = descs[avail_idx + i].len; > Looks like the code is a strong candidate for vectorization. > > + > > + UNROLL_PRAGMA(PRAGMA_PARAM) > > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > > + if (unlikely(pkts[i]->pkt_len > (lens[i] - buf_offset))) > > + return -1; > > + } > > + > > + UNROLL_PRAGMA(PRAGMA_PARAM) > > + for (i = 0; i < PACKED_DESCS_BURST; i++) > > + desc_addrs[i] = vhost_iova_to_vva(dev, vq, > > + descs[avail_idx + i].addr, > > + &lens[i], > > + VHOST_ACCESS_RW); > > + UNROLL_PRAGMA(PRAGMA_PARAM) > > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > > + if (unlikely(lens[i] != descs[avail_idx + i].len)) > > + return -1; > > + } > > + > > + UNROLL_PRAGMA(PRAGMA_PARAM) > > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > > + rte_prefetch0((void *)(uintptr_t)desc_addrs[i]); > > + hdrs[i] = (struct virtio_net_hdr_mrg_rxbuf *)desc_addrs[i]; > > + lens[i] = pkts[i]->pkt_len + dev->vhost_hlen; > > + } > > + > > + UNROLL_PRAGMA(PRAGMA_PARAM) > > + for (i = 0; i < PACKED_DESCS_BURST; i++) > > + virtio_enqueue_offload(pkts[i], &hdrs[i]->hdr); > > + > A store barrier here is missing, last_avail_idx may be observed before the > above enqueue completion on weak memory order architectures. > For x86, a compiler barrier is also required. > Thanks a lot for point out. I guess your mention is that need to add barrier between memcpy and enqueue. last_avail_idx is just local variable, no barrier is need to protect it. > > + vq->last_avail_idx += PACKED_DESCS_BURST; > > + if (vq->last_avail_idx >= vq->size) { > > + vq->last_avail_idx -= vq->size; > > + vq->avail_wrap_counter ^= 1; > > + } > > + > > + UNROLL_PRAGMA(PRAGMA_PARAM) > > + for (i = 0; i < PACKED_DESCS_BURST; i++) { > > + rte_memcpy((void *)(uintptr_t)(desc_addrs[i] + buf_offset), > > + rte_pktmbuf_mtod_offset(pkts[i], void *, 0), > > + pkts[i]->pkt_len); > > + } > > + > > + return 0; > > +} > > + > > static __rte_unused int16_t > > virtio_dev_rx_single_packed(struct virtio_net *dev, struct > vhost_virtqueue > > *vq, > > struct rte_mbuf *pkt) > > -- > > 2.17.1