Last caller of rte_atomic32_cmpset() in lib/, blocking deprecation of the rte_atomicNN_*() family.
Replace cmpset with rte_atomic_compare_exchange_weak_explicit(), and convert head/tail loads/stores from implicit seq_cst to explicit acquire/release. Matches the HTS/RTS pattern. Acquire-load of d->head orders the subsequent load of s->tail (was rte_smp_rmb()). Acquire-load of s->tail pairs with the release-store of the counterpart tail in __rte_ring_update_tail(), which subsumes the previous wmb/rmb barriers. Weak CAS avoids arm64's hidden inner retry; the outer do-while already loops. CAS orderings relaxed: no data published by the reservation. The now-unused 'enqueue' parameter of __rte_ring_update_tail() is removed; both call sites updated. Signed-off-by: Stephen Hemminger <[email protected]> --- lib/ring/rte_ring_generic_pvt.h | 64 +++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h index affd2d5ba7..9497f6737b 100644 --- a/lib/ring/rte_ring_generic_pvt.h +++ b/lib/ring/rte_ring_generic_pvt.h @@ -23,21 +23,25 @@ */ static __rte_always_inline void __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, - uint32_t new_val, uint32_t single, uint32_t enqueue) + uint32_t new_val, uint32_t single, + uint32_t enqueue __rte_unused) { - if (enqueue) - rte_smp_wmb(); - else - rte_smp_rmb(); /* * If there are other enqueues/dequeues in progress that preceded us, * we need to wait for them to complete */ if (!single) - rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val, - rte_memory_order_relaxed); + rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, + old_val, rte_memory_order_relaxed); - ht->tail = new_val; + /* + * Release ordering on the tail store ensures that the slot reads + * (dequeue) or writes (enqueue) performed by this thread are visible + * to the other side before the new tail value is observed. + * Pairs with the acquire load of the counterpart's tail in + * __rte_ring_headtail_move_head(). + */ + rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release); } /** @@ -76,25 +80,35 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, { unsigned int max = n; int success; + uint32_t tail; do { /* Reset n to the initial burst count */ n = max; - *old_head = d->head; + /* + * Acquire load: orders this load before the load of s->tail + * below (replaces rte_smp_rmb() in the previous version) and + * re-establishes ordering after a failed CAS on retry. + */ + *old_head = rte_atomic_load_explicit(&d->head, + rte_memory_order_acquire); - /* add rmb barrier to avoid load/load reorder in weak - * memory model. It is noop on x86 + /* + * Acquire load on the counterpart's tail pairs with the + * release store in __rte_ring_update_tail() on the other + * side, ensuring slot operations performed there are visible + * before the caller accesses the reserved slots. */ - rte_smp_rmb(); + tail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire); /* * The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have - * *old_head > s->tail). So 'entries' is always between 0 + * *old_head > tail). So 'entries' is always between 0 * and capacity (which is < size). */ - *entries = (capacity + s->tail - *old_head); + *entries = (capacity + tail - *old_head); /* check that we have enough room in ring */ if (unlikely(n > *entries)) @@ -106,12 +120,24 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, *new_head = *old_head + n; if (is_st) { - d->head = *new_head; + rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed); success = 1; - } else - success = rte_atomic32_cmpset( - (uint32_t *)(uintptr_t)&d->head, - *old_head, *new_head); + } else { + /* + * Weak CAS: the outer do-while handles spurious + * failures, so we avoid the strong variant's + * internal retry (which on arm64 wraps the LL/SC + * pair in a hidden inner loop). + * + * Relaxed on both success and failure: this CAS + * does not publish data. Slot data visibility is + * provided by the acquire loads above and the + * release store of tail in __rte_ring_update_tail(). + */ + success = rte_atomic_compare_exchange_weak_explicit( + &d->head, old_head, *new_head, + rte_memory_order_relaxed, rte_memory_order_relaxed); + } } while (unlikely(success == 0)); return n; } -- 2.53.0

