> Replace deprecated rte_atomic32 operations in the vmbus ring buffer
> producer with stdatomic equivalents, and replace the smp_wmb + CAS-spin
> publish with rte_wait_until_equal_32 + release-store.
> 
> The two-cursor design is preserved: tbr->windex is the driver-private
> reservation cursor that lets producers reserve slots concurrently without a
> lock; vbr->windex is the host-visible commit cursor, updated in reservation
> order so the host never observes windex pointing past unwritten data. This is
> the lockless analogue of the spinlock-around- single-cursor pattern used by
> the Linux (drivers/hv/ring_buffer.c
> hv_ringbuffer_write) and FreeBSD (sys/dev/hyperv/vmbus/vmbus_br.c
> vmbus_txbr_write) implementations of the same host contract.
> 
> The memory ordering mirrors __rte_ring_headtail_move_head and
> __rte_ring_update_tail in lib/ring/rte_ring_c11_pvt.h: relaxed wait for the
> previous producer's commit, release-store to publish. The rte_smp_wmb
> before the publish is folded into the release ordering on the store itself.
> 
> The host-shared vbr->windex remains volatile uint32_t in the packed bufring
> struct; the atomic qualifier is added via cast at the access site. The 
> (uintptr_t)
> launder on the store-side cast suppresses a spurious misaligned-atomic
> warning from the packed-struct attribute (windex is 4-byte aligned in 
> practice,
> at offset 0 of a page-aligned struct).
> 
> Signed-off-by: Stephen Hemminger <[email protected]>

Reviewed-by: Long Li <[email protected]>

> ---
>  drivers/bus/vmbus/private.h       |  2 +-
>  drivers/bus/vmbus/vmbus_bufring.c | 39 +++++++++++++++++--------------
>  2 files changed, 23 insertions(+), 18 deletions(-)
> 
> diff --git a/drivers/bus/vmbus/private.h b/drivers/bus/vmbus/private.h index
> 6efac86b77..6b7782724f 100644
> --- a/drivers/bus/vmbus/private.h
> +++ b/drivers/bus/vmbus/private.h
> @@ -25,7 +25,7 @@ extern int vmbus_logtype_bus;  struct vmbus_br {
>       struct vmbus_bufring *vbr;
>       uint32_t        dsize;
> -     uint32_t        windex; /* next available location */
> +     RTE_ATOMIC(uint32_t) windex; /* next available location */
>  };
> 
>  #define UIO_NAME_MAX 64
> diff --git a/drivers/bus/vmbus/vmbus_bufring.c
> b/drivers/bus/vmbus/vmbus_bufring.c
> index fcb97287dc..624fe8b6c5 100644
> --- a/drivers/bus/vmbus/vmbus_bufring.c
> +++ b/drivers/bus/vmbus/vmbus_bufring.c
> @@ -15,7 +15,7 @@
>  #include <rte_tailq.h>
>  #include <rte_log.h>
>  #include <rte_malloc.h>
> -#include <rte_atomic.h>
> +#include <rte_stdatomic.h>
>  #include <rte_memory.h>
>  #include <rte_pause.h>
>  #include <rte_bus_vmbus.h>
> @@ -114,6 +114,7 @@ vmbus_txbr_write(struct vmbus_br *tbr, const struct
> iovec iov[], int iovlen,
>       uint32_t ring_size = tbr->dsize;
>       uint32_t old_windex, next_windex, windex, total;
>       uint64_t save_windex;
> +     bool success;
>       int i;
> 
>       total = 0;
> @@ -121,17 +122,13 @@ vmbus_txbr_write(struct vmbus_br *tbr, const
> struct iovec iov[], int iovlen,
>               total += iov[i].iov_len;
>       total += sizeof(save_windex);
> 
> +     /* Get current free location */
> +     old_windex = rte_atomic_load_explicit(&tbr->windex,
> +                                           rte_memory_order_relaxed);
> +
>       /* Reserve space in ring */
>       do {
> -             uint32_t avail;
> -
> -             /* Get current free location */
> -             old_windex = tbr->windex;
> -
> -             /* Prevent compiler reordering this with calculation */
> -             rte_compiler_barrier();
> -
> -             avail = vmbus_br_availwrite(tbr, old_windex);
> +             uint32_t avail = vmbus_br_availwrite(tbr, old_windex);
> 
>               /* If not enough space in ring, then tell caller. */
>               if (avail <= total)
> @@ -139,8 +136,13 @@ vmbus_txbr_write(struct vmbus_br *tbr, const
> struct iovec iov[], int iovlen,
> 
>               next_windex = vmbus_br_idxinc(old_windex, total, ring_size);
> 
> -             /* Atomic update of next write_index for other threads */
> -     } while (!rte_atomic32_cmpset(&tbr->windex, old_windex,
> next_windex));
> +             /* Atomic update of next write_index for other threads
> +              * Can use weak since easy to recompute and retry.
> +              */
> +             success = rte_atomic_compare_exchange_weak_explicit(
> +                             &tbr->windex, &old_windex, next_windex,
> +                             rte_memory_order_acquire,
> rte_memory_order_relaxed);
> +     } while (unlikely(!success));
> 
>       /* Space from old..new is now reserved */
>       windex = old_windex;
> @@ -157,12 +159,15 @@ vmbus_txbr_write(struct vmbus_br *tbr, const
> struct iovec iov[], int iovlen,
>       /* The region reserved should match region used */
>       RTE_ASSERT(windex == next_windex);
> 
> -     /* Ensure that data is available before updating host index */
> -     rte_smp_wmb();
> +     /* Wait for previous producer to publish their windex update */
> +     rte_wait_until_equal_32(&vbr->windex, old_windex,
> +rte_memory_order_relaxed);
> 
> -     /* Checkin for our reservation. wait for our turn to update host */
> -     while (!rte_atomic32_cmpset(&vbr->windex, old_windex,
> next_windex))
> -             rte_pause();
> +     /* Publish our windex update; prior data writes ordered via release.
> +      * windex is 4-byte aligned in practice (struct is page-aligned, windex
> +      * at offset 0); cast launders the packed-struct alignment-1 attribute.
> +      */
> +     rte_atomic_store_explicit((volatile __rte_atomic uint32_t
> *)(uintptr_t)&vbr->windex,
> +                               next_windex, rte_memory_order_release);
> 
>       /* If host had read all data before this, then need to signal */
>       *need_sig |= vmbus_txbr_need_signal(vbr, old_windex);
> --
> 2.53.0

Reply via email to