Already looks good. I have one minor suggestion.

In |rte_ring_c11_pvt.h| (and in the MCS lock code as well), we introduced
a comment style that annotates load-acquire and store-release
operations as |An| and |Rm|, respectively. Each |An| comment refers to the
corresponding |Rm| it synchronizes with, and vice versa, while also describing
the intent of the pairing.


--wathsala

On 5/20/26 23:17, Stephen Hemminger wrote:
Last caller of rte_atomic32_cmpset() in lib/, blocking deprecation
of the rte_atomicNN_*() family.

Replace cmpset with rte_atomic_compare_exchange_weak_explicit(),
and convert head/tail loads/stores from implicit seq_cst to explicit
acquire/release. Matches the HTS/RTS pattern.

Acquire-load of d->head orders the subsequent load of s->tail (was
rte_smp_rmb()). Acquire-load of s->tail pairs with the release-store
of the counterpart tail in __rte_ring_update_tail(), which subsumes
the previous wmb/rmb barriers.

Weak CAS avoids arm64's hidden inner retry; the outer do-while already
loops. CAS orderings relaxed: no data published by the reservation.

The now-unused 'enqueue' parameter of __rte_ring_update_tail() is
removed; both call sites updated.

Signed-off-by: Stephen Hemminger<[email protected]>
---
  lib/ring/rte_ring_generic_pvt.h | 64 +++++++++++++++++++++++----------
  1 file changed, 45 insertions(+), 19 deletions(-)

diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index affd2d5ba7..9497f6737b 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -23,21 +23,25 @@
   */
  static __rte_always_inline void
  __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
-               uint32_t new_val, uint32_t single, uint32_t enqueue)
+               uint32_t new_val, uint32_t single,
+               uint32_t enqueue __rte_unused)
  {
-       if (enqueue)
-               rte_smp_wmb();
-       else
-               rte_smp_rmb();
        /*
         * If there are other enqueues/dequeues in progress that preceded us,
         * we need to wait for them to complete
         */
        if (!single)
-               rte_wait_until_equal_32((volatile uint32_t 
*)(uintptr_t)&ht->tail, old_val,
-                       rte_memory_order_relaxed);
+               rte_wait_until_equal_32((volatile uint32_t 
*)(uintptr_t)&ht->tail,
+                       old_val, rte_memory_order_relaxed);
- ht->tail = new_val;
+       /*
+        * Release ordering on the tail store ensures that the slot reads
+        * (dequeue) or writes (enqueue) performed by this thread are visible
+        * to the other side before the new tail value is observed.
+        * Pairs with the acquire load of the counterpart's tail in
+        * __rte_ring_headtail_move_head().
+        */
+       rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
  }
/**
@@ -76,25 +80,35 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
  {
        unsigned int max = n;
        int success;
+       uint32_t tail;
do {
                /* Reset n to the initial burst count */
                n = max;
- *old_head = d->head;
+               /*
+                * Acquire load: orders this load before the load of s->tail
+                * below (replaces rte_smp_rmb() in the previous version) and
+                * re-establishes ordering after a failed CAS on retry.
+                */
+               *old_head = rte_atomic_load_explicit(&d->head,
+                               rte_memory_order_acquire);
- /* add rmb barrier to avoid load/load reorder in weak
-                * memory model. It is noop on x86
+               /*
+                * Acquire load on the counterpart's tail pairs with the
+                * release store in __rte_ring_update_tail() on the other
+                * side, ensuring slot operations performed there are visible
+                * before the caller accesses the reserved slots.
                 */
-               rte_smp_rmb();
+               tail = rte_atomic_load_explicit(&s->tail, 
rte_memory_order_acquire);
/*
                 *  The subtraction is done between two unsigned 32bits value
                 * (the result is always modulo 32 bits even if we have
-                * *old_head > s->tail). So 'entries' is always between 0
+                * *old_head > tail). So 'entries' is always between 0
                 * and capacity (which is < size).
                 */
-               *entries = (capacity + s->tail - *old_head);
+               *entries = (capacity + tail - *old_head);
/* check that we have enough room in ring */
                if (unlikely(n > *entries))
@@ -106,12 +120,24 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
*new_head = *old_head + n;
                if (is_st) {
-                       d->head = *new_head;
+                       rte_atomic_store_explicit(&d->head, *new_head, 
rte_memory_order_relaxed);
                        success = 1;
-               } else
-                       success = rte_atomic32_cmpset(
-                                       (uint32_t *)(uintptr_t)&d->head,
-                                       *old_head, *new_head);
+               } else {
+                       /*
+                        * Weak CAS: the outer do-while handles spurious
+                        * failures, so we avoid the strong variant's
+                        * internal retry (which on arm64 wraps the LL/SC
+                        * pair in a hidden inner loop).
+                        *
+                        * Relaxed on both success and failure: this CAS
+                        * does not publish data. Slot data visibility is
+                        * provided by the acquire loads above and the
+                        * release store of tail in __rte_ring_update_tail().
+                        */
+                       success = rte_atomic_compare_exchange_weak_explicit(
+                               &d->head, old_head, *new_head,
+                               rte_memory_order_relaxed, 
rte_memory_order_relaxed);
+               }
        } while (unlikely(success == 0));
        return n;
  }

Reply via email to