Enforce a safe partial order by making the CAS and the preceding head
load use release and acquire semantics. This creates a pairwise
happens-before relationship between threads of the same role.

Combine the two load-acquire operations of ht.raw, which were previously
split across the two paths of a conditional branch, into
__rte_ring_hts_head_wait. This simplifies the branching logic and makes
the synchronization behavior easier to understand.

Add comments to explain synchronizes with edges in detail.

Fixes: 1cc363b8ce06e ("ring: introduce HTS ring mode")
Cc: [email protected]

Signed-off-by: Wathsala Vithanage <[email protected]>
Signed-off-by: Ola Liljedahl <[email protected]>
---
 lib/ring/rte_ring_hts_elem_pvt.h | 66 ++++++++++++++++++++++++--------
 1 file changed, 49 insertions(+), 17 deletions(-)

diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
index e2b82dd1e6..a01089d15d 100644
--- a/lib/ring/rte_ring_hts_elem_pvt.h
+++ b/lib/ring/rte_ring_hts_elem_pvt.h
@@ -32,22 +32,40 @@ __rte_ring_hts_update_tail(struct rte_ring_hts_headtail 
*ht, uint32_t old_tail,
        RTE_SET_USED(enqueue);
 
        tail = old_tail + num;
+
+       /*
+        * R0: Release the tail update. Establishes a synchronization edge with
+        * the load-acquire at A1. This release ensures that all updates to *ht
+        * and the ring array made by this thread become visible to the opposing
+        * thread once the tail value written here is observed.
+        */
        rte_atomic_store_explicit(&ht->ht.pos.tail, tail, 
rte_memory_order_release);
 }
 
 /**
- * @internal waits till tail will become equal to head.
- * Means no writer/reader is active for that ring.
- * Suppose to work as serialization point.
+ * @internal
+ * Waits until the tail becomes equal to the head.
+ * This indicates that another thread has finished its transaction, and there
+ * is a chance that we could be the next writer or reader in line.
+ *
+ * Returns ht.raw at this point. The value may be imprecise, since another
+ * thread might change the state before we observe ht.raw, but that does not
+ * matter. The function __rte_ring_hts_move_head() can detect and recall this
+ * function when it reaches the linearization point (CAS).
  */
-static __rte_always_inline void
+static __rte_always_inline union __rte_ring_hts_pos
 __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
-               union __rte_ring_hts_pos *p)
+                        rte_memory_order memorder)
 {
-       while (p->pos.head != p->pos.tail) {
+       union __rte_ring_hts_pos p;
+       p.raw = rte_atomic_load_explicit(&ht->ht.raw, memorder);
+
+       while (p.pos.head != p.pos.tail) {
                rte_pause();
-               p->raw = rte_atomic_load_explicit(&ht->ht.raw, 
rte_memory_order_acquire);
+               p.raw = rte_atomic_load_explicit(&ht->ht.raw, memorder);
        }
+
+       return p;
 }
 
 /**
@@ -80,11 +98,9 @@ __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
        enum rte_ring_queue_behavior behavior, uint32_t *old_head,
        uint32_t *entries)
 {
-       uint32_t n;
+       uint32_t n, stail;
        union __rte_ring_hts_pos np, op;
 
-       op.raw = rte_atomic_load_explicit(&d->ht.raw, rte_memory_order_acquire);
-
        do {
                /* Reset n to the initial burst count */
                n = num;
@@ -94,7 +110,20 @@ __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
                 * make sure that we read prod head/tail *before*
                 * reading cons tail.
                 */
-               __rte_ring_hts_head_wait(d, &op);
+               /*
+                * A0: Synchronizes with the CAS at R1.
+                * Establishes a happens-before relationship with a thread of 
the same
+                * type that released the ht.raw, ensuring this thread observes 
all of
+                * its memory effects needed to maintain a safe partial order.
+                */
+               op = __rte_ring_hts_head_wait(d, rte_memory_order_acquire);
+
+               /*
+                * A1: Establish a synchronizes-with edge using a store-release 
at R0.
+                * This ensures that all memory effects from the preceding 
opposing
+                * thread are observed.
+                */
+               stail = rte_atomic_load_explicit(&s->tail, 
rte_memory_order_acquire);
 
                /*
                 *  The subtraction is done between two unsigned 32bits value
@@ -102,7 +131,7 @@ __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
                 * *old_head > cons_tail). So 'entries' is always between 0
                 * and capacity (which is < size).
                 */
-               *entries = capacity + s->tail - op.pos.head;
+               *entries = capacity + stail - op.pos.head;
 
                /* check that we have enough room in ring */
                if (unlikely(n > *entries))
@@ -116,14 +145,17 @@ __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
                np.pos.head = op.pos.head + n;
 
        /*
-        * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
-        *  - OOO reads of cons tail value
-        *  - OOO copy of elems from the ring
+        * R1: Establishes a synchronizes-with edge with the load-acquire
+        * of ht.raw at A0. This makes sure that the store-release to the
+        * tail by this thread, if it was of the opposite type, becomes
+        * visible to another thread of the current type. That thread will
+        * then observe the updates in the same order, keeping a safe
+        * partial order.
         */
        } while (rte_atomic_compare_exchange_strong_explicit(&d->ht.raw,
                        (uint64_t *)(uintptr_t)&op.raw, np.raw,
-                       rte_memory_order_acquire,
-                       rte_memory_order_acquire) == 0);
+                       rte_memory_order_release,
+                       rte_memory_order_relaxed) == 0);
 
        *old_head = op.pos.head;
        return n;
-- 
2.43.0

Reply via email to