On 2015/1/27 17:37, Michael S. Tsirkin wrote:
> On Tue, Jan 27, 2015 at 03:57:13PM +0800, Linhaifeng wrote:
>> Hi,all
>>
>> I use vhost-user to send data to VM at first it cant work well but after 
>> many hours VM can not receive data but can send data.
>>
>> (gdb)p avail_idx
>> $4 = 2668
>> (gdb)p free_entries
>> $5 = 0
>> (gdb)l
>>         /* check that we have enough buffers */
>>         if (unlikely(count > free_entries))
>>             count = free_entries;
>>
>>         if (count == 0){
>>             int b=0;
>>             if(b) { // when set b=1 to notify guest rx_ring will restart to 
>> work
>>                 if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
>>
>>                     eventfd_write(vq->callfd, 1);
>>                 }
>>             }
>>             return 0;
>>         }
>>
>> some info i print in guest:
>>
>> net eth3:vi->num=199
>> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668
>> net eth3:svq info: num_free=254, used->idx=1644, avail->idx=1644
>>
>> net eth3:vi->num=199
>> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668
>> net eth3:svq info: num_free=254, used->idx=1645, avail->idx=1645
>>
>> net eth3:vi->num=199
>> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668
>> net eth3:svq info: num_free=254, used->idx=1646, avail->idx=1646
>>
>> # free
>>              total       used       free     shared    buffers     cached
>> Mem:      3924100      337252    3586848          0      95984     138060
>> -/+ buffers/cache:     103208    3820892
>> Swap:       970748          0     970748
>>
>> I have two questions:
>> 1.Should we need to notify guest when there is no buffer in vq->avail?
> 
> No unless NOTIFY_ON_EMPTY is set (most guests don't set it).

Thank you for your new knowledge:)

> 
>> 2.Why virtio_net stop to fill avail?
> 
> Most likely, it didn't get an interrupt.
> 
> If so, it would be a dpdk vhost user bug.
> Which code are you using in dpdk?
> 

Hi,mst

Thank you for your reply.
Sorry, maybe my mail filter have a bug,so i saw this mail until now.

I use the dpdk code before 2bbb811.I paste the code here for you to review.
(Note that the vhost_enqueue_burstand vhost_dequeue_burst function runs as poll 
mode.)

I guess if vhost_enqueue_burst used all the buffers in rx_ring then try to 
notify guest
to receive but at this time vcpu may be exiting so guest cann't receive the 
notify.


/*
 * Enqueues packets to the guest virtio RX virtqueue for vhost devices.
 */
static inline uint32_t __attribute__((always_inline))
vhost_enqueue_burst(struct virtio_net *dev, struct rte_mbuf **pkts, unsigned 
count)
{
        struct vhost_virtqueue *vq;
        struct vring_desc *desc;
        struct rte_mbuf *buff;
        /* The virtio_hdr is initialised to 0. */
        struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0,0,0,0,0,0},0};
        uint64_t buff_addr = 0;
        uint64_t buff_hdr_addr = 0;
        uint32_t head[PKT_BURST_SIZE], packet_len = 0;
        uint32_t head_idx, packet_success = 0;
        uint32_t mergeable, mrg_count = 0;
        uint32_t retry = 0;
        uint16_t avail_idx, res_cur_idx;
        uint16_t res_base_idx, res_end_idx;
        uint16_t free_entries;
        uint8_t success = 0;

        LOG_DEBUG(APP, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
        vq = dev->virtqueue[VIRTIO_RXQ];
        count = (count > PKT_BURST_SIZE) ? PKT_BURST_SIZE : count;

        /* As many data cores may want access to available buffers, they need 
to be reserved. */
        do {
                res_base_idx = vq->last_used_idx_res;
                avail_idx = *((volatile uint16_t *)&vq->avail->idx);

                free_entries = (avail_idx - res_base_idx);
                /* If retry is enabled and the queue is full then we wait and 
retry to avoid packet loss. */
                if (unlikely(count > free_entries)) {
                        for (retry = 0; retry < burst_tx_retry_num; retry++) {
                                rte_delay_us(burst_tx_delay_time);
                                avail_idx =
                                        *((volatile uint16_t *)&vq->avail->idx);
                                free_entries = (avail_idx - res_base_idx);
                                if (count <= free_entries)
                                        break;
                        }
                }

                /*check that we have enough buffers*/
                if (unlikely(count > free_entries))
                        count = free_entries;

                if (count == 0)                         // !!!!!!!!!!!!!!!!!!! 
when VM cann't receive always return here
                        return 0;

                res_end_idx = res_base_idx + count;
                /* vq->last_used_idx_res is atomically updated. */
                success = rte_atomic16_cmpset(&vq->last_used_idx_res, 
res_base_idx,
                                                                        
res_end_idx);
        } while (unlikely(success == 0));
        res_cur_idx = res_base_idx;
        LOG_DEBUG(APP, "(%"PRIu64") Current Index %d| End Index %d\n", 
dev->device_fh, res_cur_idx, res_end_idx);

        /* Prefetch available ring to retrieve indexes. */
        rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]);

        /* Check if the VIRTIO_NET_F_MRG_RXBUF feature is enabled. */
        mergeable = dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF);

        /* Retrieve all of the head indexes first to avoid caching issues. */
        for (head_idx = 0; head_idx < count; head_idx++)
                head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) & 
(vq->size - 1)];

        /*Prefetch descriptor index. */
        rte_prefetch0(&vq->desc[head[packet_success]]);

        while (res_cur_idx != res_end_idx) {
                /* Get descriptor from available ring */
                desc = &vq->desc[head[packet_success]];

                buff = pkts[packet_success];

                /* Convert from gpa to vva (guest physical addr -> vhost 
virtual addr) */
                buff_addr = gpa_to_vva(dev, desc->addr);
                /* Prefetch buffer address. */
                rte_prefetch0((void*)(uintptr_t)buff_addr);

                if (mergeable && (mrg_count != 0)) {
                        desc->len = packet_len = rte_pktmbuf_data_len(buff);
                } else {
                        /* Copy virtio_hdr to packet and increment buffer 
address */
                        buff_hdr_addr = buff_addr;
                        packet_len = rte_pktmbuf_data_len(buff) + 
vq->vhost_hlen;

                        /*
                         * If the descriptors are chained the header and data 
are placed in
                         * separate buffers.
                         */
                        if (desc->flags & VRING_DESC_F_NEXT) {
                                desc->len = vq->vhost_hlen;
                                desc = &vq->desc[desc->next];
                                /* Buffer address translation. */
                                buff_addr = gpa_to_vva(dev, desc->addr);
                                desc->len = rte_pktmbuf_data_len(buff);
                        } else {
                                buff_addr += vq->vhost_hlen;
                                desc->len = packet_len;
                        }
                }

                /* Update used ring with desc information */
                vq->used->ring[res_cur_idx & (vq->size - 1)].id = 
head[packet_success];
                vq->used->ring[res_cur_idx & (vq->size - 1)].len = packet_len;

                /* Copy mbuf data to buffer */
                rte_memcpy((void *)(uintptr_t)buff_addr, (const 
void*)buff->pkt.data, rte_pktmbuf_data_len(buff));

                PRINT_PACKET(dev, (uintptr_t)buff_addr, 
rte_pktmbuf_data_len(buff), 0);

                res_cur_idx++;
                packet_success++;

                /* If mergeable is disabled then a header is required per 
buffer. */
                if (!mergeable) {
                        rte_memcpy((void *)(uintptr_t)buff_hdr_addr, (const 
void*)&virtio_hdr, vq->vhost_hlen);
                        PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, 
vq->vhost_hlen, 1);
                } else {
                        mrg_count++;
                        /* Merge buffer can only handle so many buffers at a 
time. Tell the guest if this limit is reached. */
                        if ((mrg_count == MAX_MRG_PKT_BURST) || (res_cur_idx == 
res_end_idx)) {
                                virtio_hdr.num_buffers = mrg_count;
                                LOG_DEBUG(APP, "(%"PRIu64") RX: Num merge 
buffers %d\n", dev->device_fh, virtio_hdr.num_buffers);
                                rte_memcpy((void *)(uintptr_t)buff_hdr_addr, 
(const void*)&virtio_hdr, vq->vhost_hlen);
                                PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, 
vq->vhost_hlen, 1);
                                mrg_count = 0;
                        }
                }
                if (res_cur_idx < res_end_idx) {
                        /* Prefetch descriptor index. */
                        rte_prefetch0(&vq->desc[head[packet_success]]);
                }
        }

        rte_compiler_barrier();

        /* Wait until it's our turn to add our buffer to the used ring. */
        while (unlikely(vq->last_used_idx != res_base_idx))
                rte_pause();

        *(volatile uint16_t *) &vq->used->idx += count;
        vq->last_used_idx = res_end_idx;

        /* Kick the guest if necessary. */
        if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
                eventfd_write(vq->kickfd,1);

        return count;
}

/*
 * Dequeues packets from the guest virtio TX virtqueue for vhost devices.
 */
static inline uint16_t __attribute__((always_inline))
vhost_dequeue_burst(struct virtio_net *dev, struct rte_mbuf **pkts, unsigned 
count)
{
        struct rte_mbuf *mbuf;
        struct vhost_virtqueue *vq;
        struct vring_desc *desc;
        uint64_t buff_addr = 0;
        uint32_t head[PKT_BURST_SIZE];
        uint32_t used_idx, i;
        uint16_t free_entries, packet_success = 0;
        uint16_t avail_idx;

        vq = dev->virtqueue[VIRTIO_TXQ];
        avail_idx = *((volatile uint16_t *)&vq->avail->idx);

        /* If there are no available buffers then return. */
        if (vq->last_used_idx == avail_idx)
                return 0;

        LOG_DEBUG(APP, "(%"PRIu64") virtio_dev_tx()\n", dev->device_fh);

        /* Prefetch available ring to retrieve head indexes. */
        rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);

        /*get the number of free entries in the ring*/
        free_entries = (avail_idx - vq->last_used_idx);

        /* Limit to PKT_BURST_SIZE. */
        if (free_entries > count)
                free_entries = count;

        /*
         * Performance is better if cachelines containing descriptors are not 
accessed by multiple
         * cores. We try finish with a cacheline before passing it on.
         */
        if (likely(free_entries > DESC_PER_CACHELINE))
                free_entries = free_entries - ((vq->last_used_idx + 
free_entries) % DESC_PER_CACHELINE);

        LOG_DEBUG(APP, "(%"PRIu64") Buffers available %d\n", dev->device_fh, 
free_entries);
        /* Retrieve all of the head indexes first to avoid caching issues. */
        for (i = 0; i < free_entries; i++)
                head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 
1)];

        /* Prefetch descriptor index. */
        rte_prefetch0(&vq->desc[head[packet_success]]);
        rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);

        while (packet_success < free_entries) {
                desc = &vq->desc[head[packet_success]];

                /* Discard first buffer as it is the virtio header */
                desc = &vq->desc[desc->next];

                /* Buffer address translation. */
                buff_addr = gpa_to_vva(dev, desc->addr);
                /* Prefetch buffer address. */
                rte_prefetch0((void*)(uintptr_t)buff_addr);

                used_idx = vq->last_used_idx & (vq->size - 1);

                if (packet_success < (free_entries - 1)) {
                        /* Prefetch descriptor index. */
                        rte_prefetch0(&vq->desc[head[packet_success+1]]);
                        rte_prefetch0(&vq->used->ring[(used_idx + 1) & 
(vq->size - 1)]);
                }

                /* Update used index buffer information. */
                vq->used->ring[used_idx].id = head[packet_success];
                vq->used->ring[used_idx].len = 0;

                /* Allocate an mbuf and populate the structure. */
                mbuf = rte_pktmbuf_alloc(pktmbuf_pool);
                if (unlikely(mbuf == NULL)) {
                        RTE_LOG(ERR, APP, "Failed to allocate memory for 
mbuf.\n");
                        return packet_success;
                }

                /* Setup dummy mbuf. */
                mbuf->pkt.data_len = desc->len;
                mbuf->pkt.pkt_len = mbuf->pkt.data_len;

                rte_memcpy((void*) mbuf->pkt.data,
                        (const void*) buff_addr, mbuf->pkt.data_len);

                pkts[packet_success]=mbuf;

                PRINT_PACKET(dev, (uintptr_t)buff_addr, desc->len, 0);

                vq->last_used_idx++;
                packet_success++;
        }

        rte_compiler_barrier();
        vq->used->idx += packet_success;
        /* Kick guest if required. */
        if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
                eventfd_write(vq->kickfd,1);
        return packet_success;
}

>>
>>
>>
>>
>>
>> -- 
>> Regards,
>> Haifeng
> 
> .
> 

-- 
Regards,
Haifeng

Reply via email to